Bläddra i källkod

enhance(ux): graph download && upload log

Tienson Qin 1 månad sedan
förälder
incheckning
1c4dd7cf7c

+ 18 - 3
src/main/frontend/handler/db_based/db_sync.cljs

@@ -252,7 +252,15 @@
                             (throw (ex-info "missing snapshot download url"
                                             {:graph graph-name
                                              :response download-resp})))
-                        resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))]
+                        resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))
+                        total-bytes (when-let [raw (some-> resp .-headers (.get "content-length"))]
+                                      (let [parsed (js/parseInt raw 10)]
+                                        (when-not (js/isNaN parsed) parsed)))
+                        _ (state/pub-event!
+                           [:rtc/log {:type :rtc.log/download
+                                      :sub-type :download-progress
+                                      :graph-uuid graph-uuid
+                                      :message (str "Start downloading graph snapshot, file size: " total-bytes)}])]
                   (when-not (.-ok resp)
                     (throw (ex-info "snapshot download failed"
                                     {:graph graph-name
@@ -263,20 +271,27 @@
                   (p/let [reader (.getReader (.-body resp))]
                     (p/loop [buffer nil
                              total 0
-                             total-rows []]
+                             total-rows []
+                             loaded 0]
                       (p/let [chunk (.read reader)]
                         (if (.-done chunk)
                           (let [rows (finalize-framed-buffer buffer)
                                 total' (+ total (count rows))
                                 total-rows' (into total-rows rows)]
+                            (state/pub-event!
+                             [:rtc/log {:type :rtc.log/download
+                                        :sub-type :download-completed
+                                        :graph-uuid graph-uuid
+                                        :message "Graph snapshot downloaded"}])
                             (when (seq total-rows')
                               (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
                                                        graph total-rows' true graph-uuid remote-tx))
                             total')
                           (let [value (.-value chunk)
+                                loaded' (+ loaded (.-byteLength value))
                                 {:keys [rows buffer]} (parse-framed-chunk buffer value)
                                 total' (+ total (count rows))]
-                            (p/recur buffer total' (into total-rows rows))))))))
+                            (p/recur buffer total' (into total-rows rows) loaded')))))))
                 (p/finally
                   (fn []
                     (when-let [download-url @download-url*]

+ 25 - 9
src/main/frontend/worker/db_sync.cljs

@@ -1354,6 +1354,13 @@
                  :bind #js [last-addr limit]
                  :rowMode "array"}))
 
+(defn- count-kvs-rows
+  [db]
+  (when-let [result (-> (.exec db #js {:sql "select count(*) from kvs"
+                                       :rowMode "array"})
+                        first)]
+    (first (bean/->clj result))))
+
 (defn- normalize-snapshot-rows [rows]
   (mapv (fn [row] (vec row)) (array-seq rows)))
 
@@ -1402,7 +1409,12 @@
   [repo]
   (->
    (let [base (http-base-url)
-         graph-id (get-graph-id repo)]
+         graph-id (get-graph-id repo)
+         update-progress (fn [payload]
+                           (worker-util/post-message :rtc-log
+                                                     (merge {:type :rtc.log/upload
+                                                             :graph-uuid graph-id}
+                                                            payload)))]
      (if (and (seq base) (seq graph-id))
        (if-let [source-conn (worker-state/get-datascript-conn repo)]
          (p/let [aes-key (<ensure-graph-aes-key repo graph-id)
@@ -1412,21 +1424,23 @@
            (ensure-client-graph-uuid! repo graph-id)
            (p/let [datoms (d/datoms @source-conn :eavt)
                    _ (prn :debug :datoms-count (count datoms) :time (js/Date.))
+                   _ (update-progress {:sub-type :upload-progress
+                                       :message "Encrypting data"})
                    encrypted-datoms (<encrypt-datoms aes-key datoms)
-                   _ (prn :debug :encrypted-datoms-count (count encrypted-datoms)
-                          :time (js/Date.))
-                   {:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)]
-             (prn :debug :created-temp-conn :time (js/Date.))
+                   {:keys [db] :as temp} (<create-temp-sqlite-conn (d/schema @source-conn) encrypted-datoms)
+                   total-rows (count-kvs-rows db)]
              (->
               (p/loop [last-addr -1
-                       first-batch? true]
+                       first-batch? true
+                       loaded 0]
                 (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
-                  (prn :debug :rows-count (count rows))
                   (if (empty? rows)
                     (do
                       (client-op/remove-local-tx repo)
                       (client-op/update-local-tx repo 0)
                       (client-op/add-all-exists-asset-as-ops repo)
+                      (update-progress {:sub-type :upload-completed
+                                        :message "Graph upload finished!"})
                       {:graph-id graph-id})
                     (let [max-addr (apply max (map first rows))
                           rows (normalize-snapshot-rows rows)
@@ -1439,10 +1453,12 @@
                                              :headers headers
                                              :body body}
                                             {:response-schema :sync/snapshot-upload})]
-                        (p/recur max-addr false))))))
+                        (let [loaded' (+ loaded (count rows))]
+                          (update-progress {:sub-type :upload-progress
+                                            :message (str "Uploading " loaded "/" total-rows)})
+                          (p/recur max-addr false loaded')))))))
               (p/finally
                 (fn []
-                  (prn :debug :cleanup-temp-db :time (js/Date.))
                   (cleanup-temp-sqlite! temp))))))
          (p/rejected (ex-info "db-sync missing datascript conn"
                               {:repo repo :graph-id graph-id})))

+ 23 - 5
src/main/frontend/worker/db_worker.cljs

@@ -30,6 +30,7 @@
             [frontend.worker.rtc.core :as rtc.core]
             [frontend.worker.rtc.db-listener]
             [frontend.worker.rtc.debug-log :as rtc-debug-log]
+            [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
             [frontend.worker.rtc.migrate :as rtc-migrate]
             [frontend.worker.search :as search]
             [frontend.worker.shared-service :as shared-service]
@@ -59,6 +60,7 @@
             [logseq.outliner.core :as outliner-core]
             [logseq.outliner.op :as outliner-op]
             [me.tonsky.persistent-sorted-set :as set :refer [BTSet]]
+            [medley.core :as medley]
             [missionary.core :as m]
             [promesa.core :as p]))
 
@@ -620,10 +622,18 @@
   (<unlink-db! repo))
 
 (defn- import-datoms-to-db!
-  [repo remote-tx datoms]
+  [repo graph-id remote-tx datoms]
   (-> (p/do!
+       (rtc-log-and-state/rtc-log :rtc.log/download
+                                  {:sub-type :download-progress
+                                   :graph-uuid graph-id
+                                   :message "Saving data to DB"})
        ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true
                                                                        :datoms datoms})
+       (rtc-log-and-state/rtc-log :rtc.log/download
+                                  {:sub-type :download-completed
+                                   :graph-uuid graph-id
+                                   :message "Graph is ready!"})
        ((@thread-api/*thread-apis :thread-api/export-db) repo)
        (client-op/update-local-tx repo remote-tx)
        (shared-service/broadcast-to-clients! :add-repo {:repo repo}))
@@ -637,16 +647,24 @@
           aes-key (db-sync/<fetch-graph-aes-key-for-download repo graph-id)
           _ (when (nil? aes-key)
               (db-sync/fail-fast :db-sync/missing-field {:repo repo :field :aes-key}))
-          batches (partition-all 100 rows)]
+          batches (medley/indexed (partition-all 100 rows))]
+    (rtc-log-and-state/rtc-log :rtc.log/download
+                               {:sub-type :download-progress
+                                :graph-uuid graph-id
+                                :message "Start decrypting data"})
     ;; sequential batches: low memory
-    (p/doseq [batch batches]
+    (p/doseq [[i batch] batches]
       (p/let [dec-rows (db-sync/<decrypt-snapshot-rows-batch aes-key batch)]
-        (upsert-addr-content! db (rows->sqlite-binds dec-rows))))
+        (upsert-addr-content! db (rows->sqlite-binds dec-rows))
+        (rtc-log-and-state/rtc-log :rtc.log/download
+                                   {:sub-type :download-progress
+                                    :graph-uuid graph-id
+                                    :message (str "Decrypting data " (inc i) "/" (count batches))})))
     (let [storage (new-sqlite-storage db)
           conn (common-sqlite/get-storage-conn storage db-schema/schema)
           datoms (vec (d/datoms @conn :eavt))]
       (.close db)
-      (import-datoms-to-db! repo remote-tx datoms))))
+      (import-datoms-to-db! repo graph-id remote-tx datoms))))
 
 (def-thread-api :thread-api/release-access-handles
   [repo]