Tienson Qin 2 недель назад
Родитель
Сommit
8bbc0fede8

+ 1 - 1
deps/db-sync/src/logseq/db_sync/common.cljs

@@ -10,7 +10,7 @@
   #js {"Access-Control-Allow-Origin" "*"
   #js {"Access-Control-Allow-Origin" "*"
        "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type"
        "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type"
        "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"
        "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"
-       "Access-Control-Expose-Headers" "content-type,content-encoding,cache-control,x-asset-type"})
+       "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type"})
 
 
 (defn json-response
 (defn json-response
   ([data] (json-response data 200))
   ([data] (json-response data 200))

+ 31 - 1
deps/db-sync/src/logseq/db_sync/worker/handler/assets.cljs

@@ -6,6 +6,26 @@
 
 
 (def ^:private max-asset-size (* 100 1024 1024))
 (def ^:private max-asset-size (* 100 1024 1024))
 
 
+(defn- parse-size
+  [size]
+  (cond
+    (number? size) size
+    (string? size) (let [n (js/parseInt size 10)]
+                     (when-not (js/isNaN n)
+                       n))
+    :else nil))
+
+(defn- maybe-fixed-length-body
+  [body size]
+  (if (and (number? size)
+           (exists? js/FixedLengthStream)
+           (some? body)
+           (fn? (.-pipeTo body)))
+    (let [^js fixed (js/FixedLengthStream. size)]
+      (.catch (.pipeTo body (.-writable fixed)) (fn [_] nil))
+      (.-readable fixed))
+    body))
+
 (defn parse-asset-path [path]
 (defn parse-asset-path [path]
   (let [prefix "/assets/"]
   (let [prefix "/assets/"]
     (when (string/starts-with? path prefix)
     (when (string/starts-with? path prefix)
@@ -46,8 +66,18 @@
                                                 "application/octet-stream")
                                                 "application/octet-stream")
                                content-encoding (.-contentEncoding metadata)
                                content-encoding (.-contentEncoding metadata)
                                cache-control (.-cacheControl metadata)
                                cache-control (.-cacheControl metadata)
+                               size (parse-size (or (.-size obj)
+                                                    (some-> (.-body obj) .-byteLength)))
+                               content-length (cond
+                                                (number? size) (str size)
+                                                (string? size) size
+                                                :else nil)
+                               body (maybe-fixed-length-body (.-body obj) size)
                                headers (cond-> {"content-type" content-type
                                headers (cond-> {"content-type" content-type
                                                 "x-asset-type" asset-type}
                                                 "x-asset-type" asset-type}
+                                         (and (string? content-length)
+                                              (pos? (.-length content-length)))
+                                         (assoc "content-length" content-length)
                                          (and (string? content-encoding)
                                          (and (string? content-encoding)
                                               (not= content-encoding "null")
                                               (not= content-encoding "null")
                                               (pos? (.-length content-encoding)))
                                               (pos? (.-length content-encoding)))
@@ -57,7 +87,7 @@
                                          (assoc "cache-control" cache-control)
                                          (assoc "cache-control" cache-control)
                                          true
                                          true
                                          (bean/->js))]
                                          (bean/->js))]
-                           (js/Response. (.-body obj)
+                           (js/Response. body
                                          #js {:status 200
                                          #js {:status 200
                                               :headers (js/Object.assign
                                               :headers (js/Object.assign
                                                         headers
                                                         headers

+ 23 - 4
deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs

@@ -86,6 +86,17 @@
     (.pipeThrough stream (js/DecompressionStream. "gzip"))
     (.pipeThrough stream (js/DecompressionStream. "gzip"))
     stream))
     stream))
 
 
+(defn- maybe-compress-stream [stream]
+  (if (exists? js/CompressionStream)
+    (.pipeThrough stream (js/CompressionStream. snapshot-content-encoding))
+    stream))
+
+(defn- <buffer-stream
+  [stream]
+  (p/let [resp (js/Response. stream)
+          buf (.arrayBuffer resp)]
+    buf))
+
 (defn- ->uint8 [data]
 (defn- ->uint8 [data]
   (cond
   (cond
     (instance? js/Uint8Array data) data
     (instance? js/Uint8Array data) data
@@ -303,23 +314,31 @@
         :else
         :else
         (p/let [snapshot-id (str (random-uuid))
         (p/let [snapshot-id (str (random-uuid))
                 key (snapshot-key graph-id snapshot-id)
                 key (snapshot-key graph-id snapshot-id)
+                use-compression? (exists? js/CompressionStream)
+                content-encoding (when use-compression? snapshot-content-encoding)
                 stream (snapshot-export-stream self)
                 stream (snapshot-export-stream self)
+                stream (if use-compression?
+                         (maybe-compress-stream stream)
+                         stream)
                 multipart? (and (some? (.-createMultipartUpload bucket))
                 multipart? (and (some? (.-createMultipartUpload bucket))
                                 (fn? (.-createMultipartUpload bucket)))
                                 (fn? (.-createMultipartUpload bucket)))
                 opts #js {:httpMetadata #js {:contentType snapshot-content-type
                 opts #js {:httpMetadata #js {:contentType snapshot-content-type
-                                             :contentEncoding nil
+                                             :contentEncoding content-encoding
                                              :cacheControl snapshot-cache-control}
                                              :cacheControl snapshot-cache-control}
                           :customMetadata #js {:purpose "snapshot"
                           :customMetadata #js {:purpose "snapshot"
                                                :created-at (str (common/now-ms))}}
                                                :created-at (str (common/now-ms))}}
                 _ (if multipart?
                 _ (if multipart?
                     (upload-multipart! bucket key stream opts)
                     (upload-multipart! bucket key stream opts)
-                    (p/let [body (snapshot-export-fixed-length self)]
-                      (.put bucket key body opts)))
+                    (if use-compression?
+                      (p/let [body (<buffer-stream stream)]
+                        (.put bucket key body opts))
+                      (p/let [body (snapshot-export-fixed-length self)]
+                        (.put bucket key body opts))))
                 url (snapshot-url request graph-id snapshot-id)]
                 url (snapshot-url request graph-id snapshot-id)]
           (http/json-response :sync/snapshot-download {:ok true
           (http/json-response :sync/snapshot-download {:ok true
                                                        :key key
                                                        :key key
                                                        :url url
                                                        :url url
-                                                       :content-encoding nil}))))
+                                                       :content-encoding content-encoding}))))
 
 
     :sync/admin-reset
     :sync/admin-reset
     (do
     (do

+ 2 - 0
deps/db-sync/test/logseq/db_sync/test_runner.cljs

@@ -3,6 +3,8 @@
             [logseq.db-sync.node-adapter-test]
             [logseq.db-sync.node-adapter-test]
             [logseq.db-sync.node-config-test]
             [logseq.db-sync.node-config-test]
             [logseq.db-sync.platform-test]
             [logseq.db-sync.platform-test]
+            [logseq.db-sync.worker-handler-assets-test]
+            [logseq.db-sync.worker-handler-sync-test]
             [logseq.db-sync.worker-handler-ws-test]
             [logseq.db-sync.worker-handler-ws-test]
             [shadow.test :as st]
             [shadow.test :as st]
             [shadow.test.env :as env]))
             [shadow.test.env :as env]))

+ 24 - 0
deps/db-sync/test/logseq/db_sync/worker_handler_assets_test.cljs

@@ -0,0 +1,24 @@
+(ns logseq.db-sync.worker-handler-assets-test
+  (:require [cljs.test :refer [async deftest is]]
+            [logseq.db-sync.worker.handler.assets :as assets]
+            [promesa.core :as p]))
+
+(deftest assets-get-includes-content-length-header-test
+  (async done
+         (let [payload (js/Uint8Array. #js [1 2 3 4])
+               request (js/Request. "http://localhost/assets/graph-1/snapshot-1.snapshot"
+                                    #js {:method "GET"})
+               env #js {:LOGSEQ_SYNC_ASSETS
+                        #js {:get (fn [_key]
+                                    (js/Promise.resolve
+                                     #js {:body payload
+                                          :size 4
+                                          :httpMetadata #js {:contentType "application/octet-stream"}}))}}]
+           (-> (p/let [resp (assets/handle request env)]
+                 (is (= 200 (.-status resp)))
+                 (is (= "4" (.get (.-headers resp) "content-length"))))
+               (p/then (fn []
+                         (done)))
+               (p/catch (fn [error]
+                          (is false (str error))
+                          (done)))))))

+ 78 - 0
deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs

@@ -0,0 +1,78 @@
+(ns logseq.db-sync.worker-handler-sync-test
+  (:require [cljs.test :refer [async deftest is]]
+            [logseq.db-sync.worker.handler.sync :as sync-handler]
+            [promesa.core :as p]))
+
+(defn- empty-sql []
+  #js {:exec (fn [& _] #js [])})
+
+(defn- request-url []
+  (let [request (js/Request. "http://localhost/sync/graph-1/snapshot/download?graph-id=graph-1"
+                             #js {:method "GET"})]
+    {:request request
+     :url (js/URL. (.-url request))}))
+
+(defn- passthrough-compression-stream-constructor []
+  (js* "function(_format){ return new TransformStream(); }"))
+
+(deftest snapshot-download-uses-gzip-encoding-when-compression-supported-test
+  (async done
+         (let [put-call (atom nil)
+               bucket #js {:put (fn [key body opts]
+                                  (reset! put-call {:key key :body body :opts opts})
+                                  (js/Promise.resolve #js {:ok true}))}
+               self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
+                         :sql (empty-sql)}
+               {:keys [request url]} (request-url)
+               original-compression-stream (.-CompressionStream js/globalThis)
+               restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
+           (aset js/globalThis
+                 "CompressionStream"
+                 (passthrough-compression-stream-constructor))
+           (-> (p/let [resp (sync-handler/handle {:self self
+                                                  :request request
+                                                  :url url
+                                                  :route {:handler :sync/snapshot-download}})
+                       text (.text resp)
+                       body (js->clj (js/JSON.parse text) :keywordize-keys true)
+                       http-metadata (aget (:opts @put-call) "httpMetadata")]
+                 (is (= 200 (.-status resp)))
+                 (is (= "gzip" (:content-encoding body)))
+                 (is (= "gzip" (aget http-metadata "contentEncoding"))))
+               (p/then (fn []
+                         (restore!)
+                         (done)))
+               (p/catch (fn [error]
+                          (restore!)
+                          (is false (str error))
+                          (done)))))))
+
+(deftest snapshot-download-falls-back-to-uncompressed-when-compression-unsupported-test
+  (async done
+         (let [put-call (atom nil)
+               bucket #js {:put (fn [key body opts]
+                                  (reset! put-call {:key key :body body :opts opts})
+                                  (js/Promise.resolve #js {:ok true}))}
+               self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
+                         :sql (empty-sql)}
+               {:keys [request url]} (request-url)
+               original-compression-stream (.-CompressionStream js/globalThis)
+               restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
+           (aset js/globalThis "CompressionStream" js/undefined)
+           (-> (p/let [resp (sync-handler/handle {:self self
+                                                  :request request
+                                                  :url url
+                                                  :route {:handler :sync/snapshot-download}})
+                       text (.text resp)
+                       body (js->clj (js/JSON.parse text) :keywordize-keys true)
+                       http-metadata (aget (:opts @put-call) "httpMetadata")]
+                 (is (= 200 (.-status resp)))
+                 (is (nil? (:content-encoding body)))
+                 (is (nil? (aget http-metadata "contentEncoding"))))
+               (p/then (fn []
+                         (restore!)
+                         (done)))
+               (p/catch (fn [error]
+                          (restore!)
+                          (is false (str error))
+                          (done)))))))

+ 44 - 27
src/main/frontend/handler/db_based/sync.cljs

@@ -86,6 +86,39 @@
         rows
         rows
         (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
         (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
 
 
+(defn- gzip-bytes?
+  [^js bytes]
+  (and (some? bytes)
+       (>= (.-byteLength bytes) 2)
+       (= 31 (aget bytes 0))
+       (= 139 (aget bytes 1))))
+
+(defn- bytes->stream
+  [^js bytes]
+  (js/ReadableStream.
+   #js {:start (fn [controller]
+                 (.enqueue controller bytes)
+                 (.close controller))}))
+
+(defn- <decompress-gzip-bytes
+  [^js bytes]
+  (if (exists? js/DecompressionStream)
+    (p/let [stream (bytes->stream bytes)
+            decompressed (.pipeThrough stream (js/DecompressionStream. "gzip"))
+            resp (js/Response. decompressed)
+            buf (.arrayBuffer resp)]
+      (->uint8 buf))
+    (p/rejected (ex-info "gzip decompression not supported"
+                         {:type :db-sync/decompression-not-supported}))))
+
+(defn- <snapshot-response-bytes
+  [^js resp]
+  (p/let [buf (.arrayBuffer resp)
+          bytes (->uint8 buf)]
+    (if (gzip-bytes? bytes)
+      (<decompress-gzip-bytes bytes)
+      bytes)))
+
 (defn- auth-headers []
 (defn- auth-headers []
   (when-let [token (state/get-auth-id-token)]
   (when-let [token (state/get-auth-id-token)]
     {"authorization" (str "Bearer " token)}))
     {"authorization" (str "Bearer " token)}))
@@ -286,33 +319,17 @@
                     (throw (ex-info "snapshot download failed"
                     (throw (ex-info "snapshot download failed"
                                     {:graph graph-name
                                     {:graph graph-name
                                      :status (.-status resp)})))
                                      :status (.-status resp)})))
-                  (when-not (.-body resp)
-                    (throw (ex-info "snapshot download missing body"
-                                    {:graph graph-name})))
-                  (p/let [reader (.getReader (.-body resp))]
-                    (p/loop [buffer nil
-                             total 0
-                             total-rows []
-                             loaded 0]
-                      (p/let [chunk (.read reader)]
-                        (if (.-done chunk)
-                          (let [rows (finalize-framed-buffer buffer)
-                                total' (+ total (count rows))
-                                total-rows' (into total-rows rows)]
-                            (state/pub-event!
-                             [:rtc/log {:type :rtc.log/download
-                                        :sub-type :download-completed
-                                        :graph-uuid graph-uuid
-                                        :message "Graph snapshot downloaded"}])
-                            (when (seq total-rows')
-                              (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
-                                                       graph total-rows' true graph-uuid remote-tx))
-                            total')
-                          (let [value (.-value chunk)
-                                loaded' (+ loaded (.-byteLength value))
-                                {:keys [rows buffer]} (parse-framed-chunk buffer value)
-                                total' (+ total (count rows))]
-                            (p/recur buffer total' (into total-rows rows) loaded')))))))
+                  (p/let [snapshot-bytes (<snapshot-response-bytes resp)
+                          rows (finalize-framed-buffer snapshot-bytes)]
+                    (state/pub-event!
+                     [:rtc/log {:type :rtc.log/download
+                                :sub-type :download-completed
+                                :graph-uuid graph-uuid
+                                :message "Graph snapshot downloaded"}])
+                    (when (seq rows)
+                      (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
+                                               graph rows true graph-uuid remote-tx))
+                    (count rows)))
                 (p/finally
                 (p/finally
                   (fn []
                   (fn []
                     (when-let [download-url @download-url*]
                     (when-let [download-url @download-url*]

+ 93 - 0
src/test/frontend/handler/db_based/sync_test.cljs

@@ -4,8 +4,35 @@
             [frontend.handler.db-based.sync :as db-sync]
             [frontend.handler.db-based.sync :as db-sync]
             [frontend.handler.user :as user-handler]
             [frontend.handler.user :as user-handler]
             [frontend.state :as state]
             [frontend.state :as state]
+            [logseq.db.sqlite.util :as sqlite-util]
             [promesa.core :as p]))
             [promesa.core :as p]))
 
 
+(def ^:private test-text-encoder (js/TextEncoder.))
+
+(defn- frame-bytes [^js data]
+  (let [len (.-byteLength data)
+        out (js/Uint8Array. (+ 4 len))
+        view (js/DataView. (.-buffer out))]
+    (.setUint32 view 0 len false)
+    (.set out data 4)
+    out))
+
+(defn- encode-framed-rows [rows]
+  (let [payload (.encode test-text-encoder (sqlite-util/write-transit-str rows))]
+    (frame-bytes payload)))
+
+(defn- <gzip-bytes [^js bytes]
+  (if (exists? js/CompressionStream)
+    (p/let [stream (js/ReadableStream.
+                    #js {:start (fn [controller]
+                                  (.enqueue controller bytes)
+                                  (.close controller))})
+            compressed (.pipeThrough stream (js/CompressionStream. "gzip"))
+            resp (js/Response. compressed)
+            buf (.arrayBuffer resp)]
+      (js/Uint8Array. buf))
+    (p/resolved bytes)))
+
 (deftest remove-member-request-test
 (deftest remove-member-request-test
   (async done
   (async done
          (let [called (atom nil)]
          (let [called (atom nil)]
@@ -110,3 +137,69 @@
         (is (= "graph-1" graph-uuid))
         (is (= "graph-1" graph-uuid))
         (is (and (string? message)
         (is (and (string? message)
                  (string/includes? message "Preparing")))))))
                  (string/includes? message "Preparing")))))))
+
+(deftest rtc-download-graph-imports-snapshot-once-test
+  (async done
+         (let [import-calls (atom [])
+               fetch-calls (atom [])
+               rows [[1 "content-1" "addresses-1"]
+                     [2 "content-2" "addresses-2"]]
+               framed-bytes (encode-framed-rows rows)
+               original-fetch js/fetch]
+           (-> (p/let [gzip-bytes (<gzip-bytes framed-bytes)]
+                 (set! js/fetch
+                       (fn [url opts]
+                         (let [method (or (aget opts "method") "GET")]
+                           (swap! fetch-calls conj [url method])
+                           (cond
+                             (and (= url "http://snapshot") (= method "GET"))
+                             (js/Promise.resolve
+                              #js {:ok true
+                                   :status 200
+                                   :headers #js {:get (fn [header]
+                                                        (when (= header "content-length")
+                                                          (str (.-byteLength gzip-bytes))))}
+                                   :arrayBuffer (fn [] (js/Promise.resolve (.-buffer gzip-bytes)))})
+
+                             :else
+                             (js/Promise.resolve
+                              #js {:ok true
+                                   :status 200})))))
+                 (-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
+                                     db-sync/fetch-json (fn [url _opts _schema]
+                                                          (cond
+                                                            (string/ends-with? url "/pull")
+                                                            (p/resolved {:t 42})
+
+                                                            (string/ends-with? url "/snapshot/download")
+                                                            (p/resolved {:url "http://snapshot"})
+
+                                                            :else
+                                                            (p/rejected (ex-info "unexpected fetch-json URL"
+                                                                                 {:url url}))))
+                                     user-handler/task--ensure-id&access-token (fn [resolve _reject]
+                                                                                 (resolve true))
+                                     state/<invoke-db-worker (fn [& args]
+                                                               (swap! import-calls conj args)
+                                                               (p/resolved :ok))
+                                     state/set-state! (fn [& _] nil)
+                                     state/pub-event! (fn [& _] nil)]
+                       (db-sync/<rtc-download-graph! "demo-graph" "graph-1"))
+                     (p/finally (fn [] (set! js/fetch original-fetch)))))
+               (p/then (fn [_]
+                         (is (= 1 (count @import-calls)))
+                         (let [[op graph imported-rows reset? graph-uuid remote-tx] (first @import-calls)]
+                           (is (= :thread-api/db-sync-import-kvs-rows op))
+                           (is (string/ends-with? graph "demo-graph"))
+                           (is (= rows imported-rows))
+                           (is (= true reset?))
+                           (is (= "graph-1" graph-uuid))
+                           (is (= 42 remote-tx)))
+                         (is (= [["http://snapshot" "GET"]
+                                 ["http://snapshot" "DELETE"]]
+                                @fetch-calls))
+                         (done)))
+               (p/catch (fn [error]
+                          (set! js/fetch original-fetch)
+                          (is false (str error))
+                          (done)))))))