Răsfoiți Sursa

fix(sync): tolerate stale gzip headers in snapshot download

Tienson Qin 5 zile în urmă
părinte
comite
0c0d14e9fb

+ 57 - 32
src/main/frontend/worker/sync/download.cljs

@@ -58,19 +58,43 @@
       (<decompress-gzip-bytes chunk)
       chunk)))
 
-(defn- response-body-stream
+(defn- <stream-starts-with-gzip?
+  [^js stream]
+  (let [reader (.getReader stream)]
+    (-> (.read reader)
+        (p/then (fn [result]
+                  (if (.-done result)
+                    false
+                    (gzip-bytes? (->uint8 (.-value result))))))
+        (p/catch (fn [_] false))
+        (p/finally (fn []
+                     (try
+                       (.releaseLock reader)
+                       (catch :default _)))))))
+
+(defn- <response-body-stream
   [^js resp]
-  (let [encoding (some-> resp .-headers (.get "content-encoding"))]
+  (let [body (.-body resp)
+        encoding (some-> resp .-headers (.get "content-encoding"))]
     (cond
-      (nil? (.-body resp))
-      nil
-
-      (= "gzip" encoding)
-      (when (exists? js/DecompressionStream)
-        (.pipeThrough (.-body resp) (js/DecompressionStream. "gzip")))
+      (nil? body)
+      (p/resolved nil)
+
+      (and (= "gzip" encoding) (exists? js/DecompressionStream))
+      (if (fn? (.-tee body))
+        (let [branches (.tee body)
+              probe (aget branches 0)
+              payload (aget branches 1)]
+          (-> (<stream-starts-with-gzip? probe)
+              (p/then (fn [gzip?]
+                        (if gzip?
+                          (.pipeThrough payload (js/DecompressionStream. "gzip"))
+                          payload)))
+              (p/catch (fn [_] payload))))
+        (p/resolved (.pipeThrough body (js/DecompressionStream. "gzip"))))
 
       :else
-      (.-body resp))))
+      (p/resolved body))))
 
 (defn- <flush-row-batches!
   [rows batch-size on-batch]
@@ -84,29 +108,30 @@
 
 (defn- <stream-snapshot-row-batches!
   [^js resp batch-size on-batch]
-  (if-let [stream (response-body-stream resp)]
-    (let [reader (.getReader stream)]
-      (p/loop [buffer nil
-               pending []]
-        (p/let [result (.read reader)]
-          (if (.-done result)
-            (let [pending (if (and buffer (pos? (.-byteLength buffer)))
-                            (into pending (snapshot/finalize-framed-buffer buffer))
-                            pending)]
-              (if (seq pending)
-                (p/let [_ (on-batch pending)]
-                  {:chunk-count 1})
-                {:chunk-count 0}))
-            (let [{rows :rows next-buffer :buffer} (snapshot/parse-framed-chunk buffer (->uint8 (.-value result)))
-                  pending (into pending rows)]
-              (p/let [pending (<flush-row-batches! pending batch-size on-batch)]
-                (p/recur next-buffer pending)))))))
-    (p/let [snapshot-bytes (<snapshot-response-bytes resp)
-            rows (vec (snapshot/finalize-framed-buffer snapshot-bytes))]
-      (if (seq rows)
-        (p/let [_ (on-batch rows)]
-          {:chunk-count 1})
-        {:chunk-count 0}))))
+  (p/let [stream (<response-body-stream resp)]
+    (if stream
+      (let [reader (.getReader stream)]
+        (p/loop [buffer nil
+                 pending []]
+          (p/let [result (.read reader)]
+            (if (.-done result)
+              (let [pending (if (and buffer (pos? (.-byteLength buffer)))
+                              (into pending (snapshot/finalize-framed-buffer buffer))
+                              pending)]
+                (if (seq pending)
+                  (p/let [_ (on-batch pending)]
+                    {:chunk-count 1})
+                  {:chunk-count 0}))
+              (let [{rows :rows next-buffer :buffer} (snapshot/parse-framed-chunk buffer (->uint8 (.-value result)))
+                    pending (into pending rows)]
+                (p/let [pending (<flush-row-batches! pending batch-size on-batch)]
+                  (p/recur next-buffer pending)))))))
+      (p/let [snapshot-bytes (<snapshot-response-bytes resp)
+              rows (vec (snapshot/finalize-framed-buffer snapshot-bytes))]
+        (if (seq rows)
+          (p/let [_ (on-batch rows)]
+            {:chunk-count 1})
+          {:chunk-count 0})))))
 
 (defn- with-auth-headers
   [opts]

+ 44 - 0
src/test/frontend/worker/sync/download_test.cljs

@@ -0,0 +1,44 @@
+(ns frontend.worker.sync.download-test
+  (:require [cljs.test :refer [async deftest is]]
+            [frontend.worker.sync.download :as sync-download]
+            [logseq.db-sync.snapshot :as snapshot]
+            [promesa.core :as p]))
+
+(defn- frame-bytes
+  [^js data]
+  (let [len (.-byteLength data)
+        out (js/Uint8Array. (+ 4 len))
+        view (js/DataView. (.-buffer out))]
+    (.setUint32 view 0 len false)
+    (.set out data 4)
+    out))
+
+(defn- stream-from-payload
+  [^js payload]
+  (js/ReadableStream.
+   #js {:start (fn [controller]
+                 (.enqueue controller payload)
+                 (.close controller))}))
+
+(deftest stream-snapshot-row-batches-ignores-stale-gzip-header-test
+  (async done
+         (let [rows [[1 "row-1" nil]
+                     [2 "row-2" nil]]
+               payload (frame-bytes (snapshot/encode-rows rows))
+               resp (js/Response.
+                     (stream-from-payload payload)
+                     #js {:status 200
+                          :headers #js {"content-encoding" "gzip"}})
+               batches* (atom [])]
+           (-> (#'sync-download/<stream-snapshot-row-batches!
+                resp
+                1000
+                (fn [batch]
+                  (swap! batches* conj batch)
+                  (p/resolved true)))
+               (p/then (fn [_]
+                         (is (= [rows] @batches*))
+                         (done)))
+               (p/catch (fn [error]
+                          (is false (str error))
+                          (done)))))))