Browse Source

use R2 for both graph upload/download

Tienson Qin 1 month ago
parent
commit
83a3a7cdf9

+ 3 - 2
deps/db-sync/src/logseq/db_sync/common.cljs

@@ -7,8 +7,9 @@
 
 (defn cors-headers []
   #js {"Access-Control-Allow-Origin" "*"
-       "Access-Control-Allow-Headers" "content-type,authorization,x-amz-meta-checksum,x-amz-meta-type"
-       "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"})
+       "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type"
+       "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"
+       "Access-Control-Expose-Headers" "content-type,content-encoding,cache-control,x-asset-type"})
 
 (defn json-response
   ([data] (json-response data 200))

+ 11 - 23
deps/db-sync/src/logseq/db_sync/malli_schema.cljs

@@ -157,29 +157,18 @@
    [:t-before :int]
    [:txs :string]])
 
-(def snapshot-row-schema
-  [:or
-   [:tuple :int :string [:maybe :any]]
-   [:map
-    [:addr :int]
-    [:content :string]
-    [:addresses {:optional true} :any]]])
-
-(def snapshot-rows-response-schema
+(def snapshot-download-response-schema
   [:map
-   [:rows [:sequential snapshot-row-schema]]
-   [:last-addr :int]
-   [:done :boolean]])
-
-(def snapshot-import-request-schema
-  [:map
-   [:reset {:optional true} :boolean]
-   [:rows [:sequential [:tuple :int :string [:maybe :any]]]]])
+   [:ok :boolean]
+   [:key :string]
+   [:url :string]
+   [:content-encoding {:optional true} [:maybe :string]]])
 
-(def snapshot-import-response-schema
+(def snapshot-upload-response-schema
   [:map
    [:ok :boolean]
-   [:count :int]])
+   [:count :int]
+   [:key :string]])
 
 (def asset-get-response-schema
   [:or
@@ -190,8 +179,7 @@
   {:graphs/create graph-create-request-schema
    :graph-members/create graph-member-create-request-schema
    :graph-members/update graph-member-update-request-schema
-   :sync/tx-batch tx-batch-request-schema
-   :sync/snapshot-import snapshot-import-request-schema})
+   :sync/tx-batch tx-batch-request-schema})
 
 (def http-response-schemas
   {:graphs/list graphs-list-response-schema
@@ -206,8 +194,8 @@
    :sync/health http-ok-response-schema
    :sync/pull pull-ok-schema
    :sync/tx-batch [:or tx-batch-ok-schema tx-reject-schema http-error-response-schema]
-   :sync/snapshot-rows snapshot-rows-response-schema
-   :sync/snapshot-import snapshot-import-response-schema
+   :sync/snapshot-download snapshot-download-response-schema
+   :sync/snapshot-upload snapshot-upload-response-schema
    :sync/admin-reset http-ok-response-schema
    :assets/get asset-get-response-schema
    :assets/put http-ok-response-schema

+ 79 - 0
deps/db-sync/src/logseq/db_sync/snapshot.cljs

@@ -0,0 +1,79 @@
+(ns logseq.db-sync.snapshot
+  (:require [cognitect.transit :as transit]))
+
+(def ^:private transit-w (transit/writer :json))
+(def ^:private transit-r (transit/reader :json))
+(def ^:private text-decoder (js/TextDecoder.))
+
+(defn- ->uint8
+  [data]
+  (cond
+    (instance? js/Uint8Array data) data
+    (instance? js/ArrayBuffer data) (js/Uint8Array. data)
+    (string? data) (.encode (js/TextEncoder.) data)
+    :else (js/Uint8Array. data)))
+
+(defn encode-rows
+  [rows]
+  (->uint8 (transit/write transit-w rows)))
+
+(defn decode-rows
+  [bytes]
+  (transit/read transit-r (.decode text-decoder (->uint8 bytes))))
+
+(defn frame-bytes
+  [^js bytes]
+  (let [len (.-byteLength bytes)
+        out (js/Uint8Array. (+ 4 len))
+        view (js/DataView. (.-buffer out))]
+    (.setUint32 view 0 len false)
+    (.set out bytes 4)
+    out))
+
+(defn concat-bytes
+  [^js a ^js b]
+  (cond
+    (nil? a) b
+    (nil? b) a
+    :else
+    (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
+      (.set out a 0)
+      (.set out b (.-byteLength a))
+      out)))
+
+(defn parse-framed-chunk
+  [buffer chunk]
+  (let [data (concat-bytes buffer chunk)
+        total (.-byteLength data)]
+    (loop [offset 0
+           rows []]
+      (if (< (- total offset) 4)
+        {:rows rows
+         :buffer (when (< offset total)
+                   (.slice data offset total))}
+        (let [view (js/DataView. (.-buffer data) offset 4)
+              len (.getUint32 view 0 false)
+              next-offset (+ offset 4 len)]
+          (if (<= next-offset total)
+            (let [payload (.slice data (+ offset 4) next-offset)
+                  decoded (decode-rows payload)]
+              (recur next-offset (into rows decoded)))
+            {:rows rows
+             :buffer (.slice data offset total)}))))))
+
+(defn finalize-framed-buffer
+  [buffer]
+  (if (or (nil? buffer) (zero? (.-byteLength buffer)))
+    []
+    (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)]
+      (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer))))
+        rows
+        (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
+
+(defn framed-length
+  [rows-batches]
+  (reduce (fn [total rows]
+            (let [payload (encode-rows rows)]
+              (+ total 4 (.-byteLength payload))))
+          0
+          rows-batches))

+ 319 - 62
deps/db-sync/src/logseq/db_sync/worker.cljs

@@ -1,5 +1,6 @@
 (ns logseq.db-sync.worker
   (:require ["cloudflare:workers" :refer [DurableObject]]
+            [cljs-bean.core :as bean]
             [clojure.string :as string]
             [lambdaisland.glogi :as log]
             [lambdaisland.glogi.console :as glogi-console]
@@ -9,6 +10,7 @@
             [logseq.db-sync.index :as index]
             [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db-sync.protocol :as protocol]
+            [logseq.db-sync.snapshot :as snapshot]
             [logseq.db-sync.storage :as storage]
             [promesa.core :as p]
             [shadow.cljs.modern :refer (defclass)]))
@@ -223,8 +225,12 @@
         n))))
 
 (def ^:private max-asset-size (* 100 1024 1024))
-(def ^:private snapshot-rows-default-limit 500)
-(def ^:private snapshot-rows-max-limit 2000)
+(def ^:private snapshot-download-batch-size 500)
+(def ^:private snapshot-cache-control "private, max-age=300")
+(def ^:private snapshot-content-type "application/transit+json")
+(def ^:private snapshot-content-encoding "gzip")
+;; 10m
+(def ^:private snapshot-multipart-part-size (* 10 1024 1024))
 
 (defn- fetch-kvs-rows
   [sql after limit]
@@ -234,14 +240,216 @@
                     after
                     limit)))
 
-(defn- snapshot-row->map [row]
+(defn- snapshot-row->tuple [row]
   (if (array? row)
-    {:addr (aget row 0)
-     :content (aget row 1)
-     :addresses (aget row 2)}
-    {:addr (aget row "addr")
-     :content (aget row "content")
-     :addresses (aget row "addresses")}))
+    [(aget row 0) (aget row 1) (aget row 2)]
+    [(aget row "addr") (aget row "content") (aget row "addresses")]))
+
+(defn- ensure-import-table! [sql]
+  (common/sql-exec sql "drop table if exists kvs_import")
+  (common/sql-exec sql "create table if not exists kvs_import (addr INTEGER primary key, content TEXT, addresses JSON)"))
+
+(defn- import-snapshot-rows!
+  [sql table rows]
+  (when (seq rows)
+    (doseq [[addr content addresses] rows]
+      (common/sql-exec sql
+                       (str "insert into " table " (addr, content, addresses) values (?, ?, ?)"
+                            " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
+                       addr
+                       content
+                       addresses))))
+
+(defn- finalize-import!
+  [^js self reset?]
+  (let [sql (.-sql self)
+        state (.-state self)]
+    (ensure-schema! self)
+    (if (and state (.-storage state) (.-transactionSync (.-storage state)))
+      (.transactionSync (.-storage state)
+                        (fn []
+                          (if reset?
+                            (do
+                              (common/sql-exec sql "delete from kvs")
+                              (common/sql-exec sql "insert into kvs (addr, content, addresses) select addr, content, addresses from kvs_import")
+                              (common/sql-exec sql "delete from tx_log")
+                              (common/sql-exec sql "delete from sync_meta")
+                              (storage/set-t! sql 0))
+                            (do
+                              (common/sql-exec sql "delete from kvs where addr in (select addr from kvs_import)")
+                              (common/sql-exec sql "insert into kvs (addr, content, addresses) select addr, content, addresses from kvs_import")))
+                          (common/sql-exec sql "drop table if exists kvs_import")))
+      (do
+        (log/error :db-sync/transaction-missing {:reset reset?})
+        (throw (ex-info "missing durable object transaction" {:reset reset?}))))
+    (set! (.-conn self) (storage/open-conn sql))))
+
+(defn- graph-id-from-request [request]
+  (let [header-id (.get (.-headers request) "x-graph-id")
+        url (js/URL. (.-url request))
+        param-id (.get (.-searchParams url) "graph-id")]
+    (when (seq (or header-id param-id))
+      (or header-id param-id))))
+
+(defn- snapshot-key [graph-id snapshot-id]
+  (str graph-id "/" snapshot-id ".snapshot"))
+
+(defn- snapshot-url [request graph-id snapshot-id]
+  (let [url (js/URL. (.-url request))]
+    (str (.-origin url) "/assets/" graph-id "/" snapshot-id ".snapshot")))
+
+(defn- maybe-compress-stream [stream]
+  (if (exists? js/CompressionStream)
+    (.pipeThrough stream (js/CompressionStream. "gzip"))
+    stream))
+
+(defn- maybe-decompress-stream [stream encoding]
+  (if (and (= encoding snapshot-content-encoding) (exists? js/DecompressionStream))
+    (.pipeThrough stream (js/DecompressionStream. "gzip"))
+    stream))
+
+(defn- ->uint8 [data]
+  (cond
+    (instance? js/Uint8Array data) data
+    (instance? js/ArrayBuffer data) (js/Uint8Array. data)
+    :else (js/Uint8Array. data)))
+
+(defn- concat-uint8 [^js a ^js b]
+  (cond
+    (nil? a) b
+    (nil? b) a
+    :else
+    (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
+      (.set out a 0)
+      (.set out b (.-byteLength a))
+      out)))
+
+(defn- snapshot-export-stream [^js self]
+  (let [sql (.-sql self)
+        state (volatile! {:after -1 :done? false})]
+    (js/ReadableStream.
+     #js {:pull (fn [controller]
+                  (p/let [{:keys [after done?]} @state]
+                    (if done?
+                      (.close controller)
+                      (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)
+                            rows (mapv snapshot-row->tuple rows)
+                            last-addr (if (seq rows)
+                                        (apply max (map first rows))
+                                        after)
+                            done? (< (count rows) snapshot-download-batch-size)]
+                        (when (seq rows)
+                          (let [payload (snapshot/encode-rows rows)
+                                framed (snapshot/frame-bytes payload)]
+                            (.enqueue controller framed)))
+                        (vswap! state assoc :after last-addr :done? done?)))))})))
+
+(defn- upload-multipart!
+  [^js bucket key stream opts]
+  (p/let [^js upload (.createMultipartUpload bucket key opts)]
+    (let [reader (.getReader stream)]
+      (-> (p/loop [buffer nil
+                   part-number 1
+                   parts []]
+            (p/let [chunk (.read reader)]
+              (if (.-done chunk)
+                (cond
+                  (and buffer (pos? (.-byteLength buffer)))
+                  (p/let [^js resp (.uploadPart upload part-number buffer)
+                          parts (conj parts {:partNumber part-number :etag (.-etag resp)})]
+                    (p/let [_ (.complete upload (clj->js parts))]
+                      {:ok true}))
+
+                  (seq parts)
+                  (p/let [_ (.complete upload (clj->js parts))]
+                    {:ok true})
+
+                  :else
+                  (p/let [_ (.abort upload)]
+                    (.put bucket key (js/Uint8Array. 0) opts)))
+                (let [value (.-value chunk)
+                      buffer (concat-uint8 buffer (->uint8 value))]
+                  (if (>= (.-byteLength buffer) snapshot-multipart-part-size)
+                    (let [part (.slice buffer 0 snapshot-multipart-part-size)
+                          rest-parts (.slice buffer snapshot-multipart-part-size (.-byteLength buffer))]
+                      (p/let [^js resp (.uploadPart upload part-number part)
+                              parts (conj parts {:partNumber part-number :etag (.-etag resp)})]
+                        (p/recur rest-parts (inc part-number) parts)))
+                    (p/recur buffer part-number parts))))))
+          (p/catch (fn [error]
+                     (.abort upload)
+                     (throw error)))))))
+
+(defn- snapshot-export-length [^js self]
+  (let [sql (.-sql self)]
+    (p/loop [after -1
+             total 0]
+      (let [rows (fetch-kvs-rows sql after snapshot-download-batch-size)]
+        (if (empty? rows)
+          total
+          (let [rows (mapv snapshot-row->tuple rows)
+                payload (snapshot/encode-rows rows)
+                total (+ total 4 (.-byteLength payload))
+                last-addr (apply max (map first rows))
+                done? (< (count rows) snapshot-download-batch-size)]
+            (if done?
+              total
+              (p/recur last-addr total))))))))
+
+(defn- snapshot-export-fixed-length [^js self]
+  (p/let [length (snapshot-export-length self)
+          stream (snapshot-export-stream self)]
+    (if (exists? js/FixedLengthStream)
+      (let [^js fixed (js/FixedLengthStream. length)
+            readable (.-readable fixed)
+            writable (.-writable fixed)
+            reader (.getReader stream)
+            writer (.getWriter writable)]
+        (p/let [_ (p/loop []
+                    (p/let [chunk (.read reader)]
+                      (if (.-done chunk)
+                        (.close writer)
+                        (p/let [_ (.write writer (.-value chunk))]
+                          (p/recur)))))]
+          readable))
+      (p/let [resp (js/Response. stream)
+              buf (.arrayBuffer resp)]
+        buf))))
+
+(declare import-snapshot!)
+(defn- import-snapshot-stream! [^js self stream reset?]
+  (let [reader (.getReader stream)
+        reset-pending? (volatile! reset?)
+        total-count (volatile! 0)]
+    (ensure-import-table! (.-sql self))
+    (p/let [buffer nil]
+      (p/catch
+       (p/loop [buffer buffer]
+         (p/let [chunk (.read reader)]
+           (if (.-done chunk)
+             (let [rows (snapshot/finalize-framed-buffer buffer)
+                   rows-count (count rows)
+                   reset? (and @reset-pending? true)]
+               (when (or reset? (seq rows))
+                 (import-snapshot! self rows reset?)
+                 (vreset! reset-pending? false))
+               (vswap! total-count + rows-count)
+               (finalize-import! self reset?)
+               @total-count)
+             (let [value (.-value chunk)
+                   {:keys [rows buffer]} (snapshot/parse-framed-chunk buffer value)
+                   rows-count (count rows)
+                   reset? (and @reset-pending? (seq rows))]
+               (when (seq rows)
+                 (import-snapshot! self rows (true? reset?))
+                 (vreset! reset-pending? false))
+               (vswap! total-count + rows-count)
+               (p/recur buffer)))))
+       (fn [error]
+         (try
+           (common/sql-exec (.-sql self) "drop table if exists kvs_import")
+           (catch :default _))
+         (throw error))))))
 
 (defn- handle-assets [request ^js env]
   (let [url (js/URL. (.-url request))
@@ -262,13 +470,26 @@
                      (fn [^js obj]
                        (if (nil? obj)
                          (error-response "not found" 404)
-                         (let [content-type (or (.-contentType (.-httpMetadata obj))
-                                                "application/octet-stream")]
+                         (let [metadata (.-httpMetadata obj)
+                               content-type (or (.-contentType metadata)
+                                                "application/octet-stream")
+                               content-encoding (.-contentEncoding metadata)
+                               cache-control (.-cacheControl metadata)
+                               headers (cond-> {"content-type" content-type
+                                                "x-asset-type" asset-type}
+                                         (and (string? content-encoding)
+                                              (not= content-encoding "null")
+                                              (pos? (.-length content-encoding)))
+                                         (assoc "content-encoding" content-encoding)
+                                         (and (string? cache-control)
+                                              (pos? (.-length cache-control)))
+                                         (assoc "cache-control" cache-control)
+                                         true
+                                         (bean/->js))]
                            (js/Response. (.-body obj)
                                          #js {:status 200
                                               :headers (js/Object.assign
-                                                        #js {"content-type" content-type
-                                                             "x-asset-type" asset-type}
+                                                        headers
                                                         (cors-headers))})))))
 
               "PUT"
@@ -330,7 +551,7 @@
             tail (if (neg? slash-idx)
                    "/"
                    (subs rest-path slash-idx))
-            new-url (str (.-origin url) tail (.-search url))]
+            new-url (js/URL. (str (.-origin url) tail (.-search url)))]
         (if (seq graph-id)
           (if (= method "OPTIONS")
             (common/options-response)
@@ -341,8 +562,10 @@
                       stub (.get namespace do-id)]
                   (if (common/upgrade-request? request)
                     (.fetch stub request)
-                    (let [rewritten (js/Request. new-url request)]
-                      (.fetch stub rewritten))))
+                    (do
+                      (.set (.-searchParams new-url) "graph-id" graph-id)
+                      (let [rewritten (js/Request. (.toString new-url) request)]
+                        (.fetch stub rewritten)))))
                 access-resp)))
           (bad-request "missing graph id")))
 
@@ -370,25 +593,10 @@
 ;;      :t (t-now self)
 ;;      :datoms (common/write-transit datoms)}))
 
-(defn- import-snapshot! [^js self rows reset?]
+(defn- import-snapshot! [^js self rows _reset?]
   (let [sql (.-sql self)]
     (ensure-schema! self)
-    (when reset?
-      (common/sql-exec sql "delete from kvs")
-      (common/sql-exec sql "delete from tx_log")
-      (common/sql-exec sql "delete from sync_meta")
-      (storage/init-schema! sql)
-      (set! (.-schema-ready self) true)
-      (storage/set-t! sql 0))
-    (when (seq rows)
-      (doseq [[addr content addresses] rows]
-        (common/sql-exec sql
-                         (str "insert into kvs (addr, content, addresses) values (?, ?, ?)"
-                              " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
-                         addr
-                         content
-                         addresses)))
-    (set! (.-conn self) (storage/open-conn sql))))
+    (import-snapshot-rows! sql "kvs_import" rows)))
 
 (defn- apply-tx! [^js self sender txs]
   (let [sql (.-sql self)]
@@ -512,22 +720,36 @@
             ;; (and (= method "GET") (= path "/snapshot"))
             ;; (common/json-response (snapshot-response self))
 
-            (and (= method "GET") (= path "/snapshot/rows"))
-            (let [after (or (parse-int (.get (.-searchParams url) "after")) -1)
-                  limit (or (parse-int (.get (.-searchParams url) "limit"))
-                            snapshot-rows-default-limit)
-                  limit (-> limit
-                            (max 1)
-                            (min snapshot-rows-max-limit))
-                  rows (fetch-kvs-rows (.-sql self) after limit)
-                  rows (mapv snapshot-row->map rows)
-                  last-addr (if (seq rows)
-                              (apply max (map :addr rows))
-                              after)
-                  done? (< (count rows) limit)]
-              (json-response :sync/snapshot-rows {:rows rows
-                                                  :last-addr last-addr
-                                                  :done done?}))
+            (and (= method "GET") (= path "/snapshot/download"))
+            (let [graph-id (graph-id-from-request request)
+                  ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))]
+              (cond
+                (not (seq graph-id))
+                (bad-request "missing graph id")
+
+                (nil? bucket)
+                (error-response "missing assets bucket" 500)
+
+                :else
+                (p/let [snapshot-id (str (random-uuid))
+                        key (snapshot-key graph-id snapshot-id)
+                        stream (snapshot-export-stream self)
+                        multipart? (and (some? (.-createMultipartUpload bucket))
+                                        (fn? (.-createMultipartUpload bucket)))
+                        opts #js {:httpMetadata #js {:contentType snapshot-content-type
+                                                     :contentEncoding nil
+                                                     :cacheControl snapshot-cache-control}
+                                  :customMetadata #js {:purpose "snapshot"
+                                                       :created-at (str (common/now-ms))}}
+                        _ (if multipart?
+                            (upload-multipart! bucket key stream opts)
+                            (p/let [body (snapshot-export-fixed-length self)]
+                              (.put bucket key body opts)))
+                        url (snapshot-url request graph-id snapshot-id)]
+                  (json-response :sync/snapshot-download {:ok true
+                                                          :key key
+                                                          :url url
+                                                          :content-encoding nil}))))
 
             (and (= method "DELETE") (= path "/admin/reset"))
             (do
@@ -554,19 +776,54 @@
                                (json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
                                (bad-request "invalid tx"))))))))
 
-            (and (= method "POST") (= path "/snapshot/import"))
-            (.then (common/read-json request)
-                   (fn [result]
-                     (if (nil? result)
-                       (bad-request "missing body")
-                       (let [body (js->clj result :keywordize-keys true)
-                             body (coerce-http-request :sync/snapshot-import body)]
-                         (if (nil? body)
-                           (bad-request "invalid body")
-                           (let [{:keys [rows reset]} body]
-                             (import-snapshot! self rows reset)
-                             (json-response :sync/snapshot-import {:ok true
-                                                                   :count (count rows)})))))))
+            (and (= method "POST") (= path "/snapshot/upload"))
+            (let [graph-id (graph-id-from-request request)
+                  ^js bucket (.-LOGSEQ_SYNC_ASSETS (.-env self))
+                  reset-param (.get (.-searchParams url) "reset")
+                  reset? (if (nil? reset-param)
+                           true
+                           (not (contains? #{"false" "0"} reset-param)))]
+              (cond
+                (not (seq graph-id))
+                (bad-request "missing graph id")
+
+                (nil? bucket)
+                (error-response "missing assets bucket" 500)
+
+                (nil? (.-body request))
+                (bad-request "missing body")
+
+                :else
+                (p/let [snapshot-id (str (random-uuid))
+                        key (snapshot-key graph-id snapshot-id)
+                        req-encoding (.get (.-headers request) "content-encoding")
+                        _ (.put bucket key
+                                (.-body request)
+                                #js {:httpMetadata #js {:contentType snapshot-content-type
+                                                        :contentEncoding req-encoding
+                                                        :cacheControl snapshot-cache-control}
+                                     :customMetadata #js {:purpose "snapshot"
+                                                          :created-at (str (common/now-ms))}})
+                        ^js obj (.get bucket key)]
+                  (if (nil? obj)
+                    (error-response "snapshot missing" 500)
+                    (p/catch
+                     (let [metadata (.-httpMetadata obj)
+                           encoding (or (.-contentEncoding metadata) req-encoding)
+                           stream (.-body obj)]
+                       (if (and (= encoding snapshot-content-encoding)
+                                (not (exists? js/DecompressionStream)))
+                         (p/let [_ (.delete bucket key)]
+                           (error-response "gzip not supported" 500))
+                         (p/let [stream (maybe-decompress-stream stream encoding)
+                                 count (import-snapshot-stream! self stream reset?)
+                                 _ (.delete bucket key)]
+                           (json-response :sync/snapshot-upload {:ok true
+                                                                 :count count
+                                                                 :key key}))))
+                     (fn [error]
+                       (p/let [_ (.delete bucket key)]
+                         (throw error))))))))
 
             :else
             (not-found))))

+ 40 - 0
deps/db-sync/test/logseq/db_sync/snapshot_import_test.cljs

@@ -0,0 +1,40 @@
+(ns logseq.db-sync.snapshot-import-test
+  (:require [cljs.test :refer [deftest is async]]
+            [clojure.string :as string]
+            [logseq.db-sync.snapshot :as snapshot]
+            [logseq.db-sync.worker :as worker]
+            [promesa.core :as p]))
+
+(defn- make-sql [state]
+  #js {:exec (fn [sql & _args]
+               (swap! state update :executed conj sql)
+               nil)})
+
+(defn- make-stream [chunk]
+  (js/ReadableStream.
+   #js {:start (fn [controller]
+                 (.enqueue controller chunk)
+                 (.close controller))}))
+
+(deftest snapshot-import-failure-does-not-touch-kvs-test
+  (async done
+         (let [state (atom {:executed []})
+               sql (make-sql state)
+               self (doto (js-obj)
+                      (aset "sql" sql))]
+           (-> (with-redefs [snapshot/parse-framed-chunk (fn [_ _]
+                                                           (throw (ex-info "boom" {})))]
+                 (-> (p/then (#'worker/import-snapshot-stream!
+                              self
+                              (make-stream (js/Uint8Array. #js [1 2 3]))
+                              true)
+                             (fn [_]
+                               (is false "expected import to fail")
+                               nil))
+                     (p/catch (fn [_]
+                                (let [sqls (:executed @state)]
+                                  (is (some #(string/includes? % "drop table if exists kvs_import") sqls))
+                                  (is (not-any? #(string/includes? % "insert into kvs ") sqls))
+                                  (is (not-any? #(string/includes? % "delete from kvs") sqls)))
+                                nil))))
+               (p/finally (fn [] (done)))))))

+ 44 - 0
deps/db-sync/test/logseq/db_sync/snapshot_test.cljs

@@ -0,0 +1,44 @@
+(ns logseq.db-sync.snapshot-test
+  (:require [cljs.test :refer [deftest is testing]]
+            [logseq.db-sync.snapshot :as snapshot]))
+
+(deftest transit-frame-roundtrip-test
+  (testing "framed transit json roundtrips rows"
+    (let [expected [{:addr 1 :content "a" :addresses nil}
+                    {:addr 2 :content "b" :addresses "{\"k\":1}"}]
+          frame (snapshot/frame-bytes (snapshot/encode-rows expected))
+          {:keys [rows buffer]} (snapshot/parse-framed-chunk nil frame)]
+      (is (= rows expected))
+      (is (or (nil? buffer) (zero? (.-byteLength buffer)))))))
+
+(deftest transit-frame-split-test
+  (testing "parse-framed-chunk handles partial trailing frame"
+    (let [rows1 [{:addr 1 :content "a" :addresses nil}]
+          rows2 [{:addr 2 :content "b" :addresses nil}]
+          frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1))
+          frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2))
+          split-pos (- (.-byteLength frame2) 3)
+          part1 (.slice frame2 0 split-pos)
+          part2 (.slice frame2 split-pos (.-byteLength frame2))
+          {rows1-parsed :rows buffer :buffer} (snapshot/parse-framed-chunk nil (snapshot/concat-bytes frame1 part1))
+          {rows2-parsed :rows rows-buffer :buffer} (snapshot/parse-framed-chunk buffer part2)]
+      (is (= rows1-parsed rows1))
+      (is (= rows2-parsed rows2))
+      (is (or (nil? rows-buffer) (zero? (.-byteLength rows-buffer)))))))
+
+(deftest transit-finalize-buffer-test
+  (testing "finalize-framed-buffer parses remaining frame"
+    (let [rows [{:addr 3 :content "c" :addresses nil}]
+          frame (snapshot/frame-bytes (snapshot/encode-rows rows))]
+      (is (= rows (snapshot/finalize-framed-buffer frame)))
+      (is (= [] (snapshot/finalize-framed-buffer (js/Uint8Array.)))))))
+
+(deftest transit-framed-length-test
+  (testing "framed-length sums frame sizes"
+    (let [rows1 [{:addr 1 :content "a" :addresses nil}]
+          rows2 [{:addr 2 :content "b" :addresses nil}
+                 {:addr 3 :content "c" :addresses nil}]
+          frame1 (snapshot/frame-bytes (snapshot/encode-rows rows1))
+          frame2 (snapshot/frame-bytes (snapshot/encode-rows rows2))]
+      (is (= (+ (.-byteLength frame1) (.-byteLength frame2))
+             (snapshot/framed-length [rows1 rows2]))))))

+ 9 - 6
docs/agent-guide/db-sync/protocol.md

@@ -72,12 +72,15 @@
   - Same as WS tx/batch. Body: `{"t-before":<t>,"txs":["<tx-transit>", ...]}`.
   - Response: `{"type":"tx/batch/ok","t":<t>}` or `{"type":"tx/reject","reason":...}`.
   - Error response (400): `{"error":"missing body"|"invalid tx"}`.
-- `GET /sync/:graph-id/snapshot/rows?after=<addr>&limit=<n>`
-  - Pull sqlite kvs rows. Response: `{"rows":[{"addr":<addr>,"content":"<transit>","addresses":<json|null>}...],"last-addr":<addr>,"done":true|false}`.
-- `POST /sync/:graph-id/snapshot/import`
-  - Import sqlite kvs rows. Body: `{"reset":true|false,"rows":[[addr,content,addresses]...]}`.
-  - Response: `{"ok":true,"count":<n>}`.
-  - Error response (400): `{"error":"missing body"|"invalid body"}`.
+- `GET /sync/:graph-id/snapshot/download`
+  - Build a snapshot file in R2 and return a download URL.
+  - Response: `{"ok":true,"key":"<graph-id>/<uuid>.snapshot","url":"<origin>/assets/:graph-id/<uuid>.snapshot","content-encoding":"gzip"}`.
+  - The snapshot file is a framed Transit JSON stream of kvs rows, optionally gzip-compressed.
+- `POST /sync/:graph-id/snapshot/upload?reset=true|false`
+  - Upload a snapshot stream (framed Transit JSON, optionally gzip-compressed). The server imports rows into kvs.
+  - Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed.
+  - Response: `{"ok":true,"count":<n>,"key":"<graph-id>/<uuid>.snapshot"}`.
+  - Error response (400): `{"error":"missing body"|"missing graph id"}`.
 - `DELETE /sync/:graph-id/admin/reset`
   - Drop/recreate per-graph tables. Response: `{"ok":true}`.
 

+ 105 - 26
src/main/frontend/handler/db_based/db_sync.cljs

@@ -30,7 +30,60 @@
   (or config/db-sync-http-base
       (ws->http-base config/db-sync-ws-url)))
 
-(def ^:private snapshot-rows-limit 2000)
+(def ^:private snapshot-text-decoder (js/TextDecoder.))
+
+(defn- ->uint8 [data]
+  (cond
+    (instance? js/Uint8Array data) data
+    (instance? js/ArrayBuffer data) (js/Uint8Array. data)
+    (string? data) (.encode (js/TextEncoder.) data)
+    :else (js/Uint8Array. data)))
+
+(defn- decode-snapshot-rows [bytes]
+  (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 bytes))))
+
+(defn- frame-len [^js data offset]
+  (let [view (js/DataView. (.-buffer data) offset 4)]
+    (.getUint32 view 0 false)))
+
+(defn- concat-bytes
+  [^js a ^js b]
+  (cond
+    (nil? a) b
+    (nil? b) a
+    :else
+    (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))]
+      (.set out a 0)
+      (.set out b (.-byteLength a))
+      out)))
+
+(defn- parse-framed-chunk
+  [buffer chunk]
+  (let [data (concat-bytes buffer chunk)
+        total (.-byteLength data)]
+    (loop [offset 0
+           rows []]
+      (if (< (- total offset) 4)
+        {:rows rows
+         :buffer (when (< offset total)
+                   (.slice data offset total))}
+        (let [len (frame-len data offset)
+              next-offset (+ offset 4 len)]
+          (if (<= next-offset total)
+            (let [payload (.slice data (+ offset 4) next-offset)
+                  decoded (decode-snapshot-rows payload)]
+              (recur next-offset (into rows decoded)))
+            {:rows rows
+             :buffer (.slice data offset total)}))))))
+
+(defn- finalize-framed-buffer
+  [buffer]
+  (if (or (nil? buffer) (zero? (.-byteLength buffer)))
+    []
+    (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)]
+      (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer))))
+        rows
+        (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows}))))))
 
 (defn- auth-headers []
   (when-let [token (state/get-auth-id-token)]
@@ -176,31 +229,57 @@
   (state/set-state! :rtc/downloading-graph-uuid graph-uuid)
   (let [base (http-base)]
     (-> (if (and graph-uuid base)
-          (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
-                  graph (str config/db-version-prefix graph-name)]
-            (p/loop [after -1           ; root addr is 0
-                     first-batch? true]
-              (p/let [pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
-                                            {:method "GET"}
-                                            {:response-schema :sync/pull})
-                      remote-tx (:t pull-resp)
-                      _ (when-not (integer? remote-tx)
-                          (throw (ex-info "non-integer remote-tx when downloading graph"
-                                          {:graph graph-name
-                                           :remote-tx remote-tx})))
-                      resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/rows"
-                                            "?after=" after "&limit=" snapshot-rows-limit)
-                                       {:method "GET"}
-                                       {:response-schema :sync/snapshot-rows})
-                      rows (:rows resp)
-                      done? (true? (:done resp))
-                      last-addr (or (:last-addr resp) after)]
-                (p/do!
-                 (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
-                                          graph rows first-batch?)
-                 (if done?
-                   (state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx)
-                   (p/recur last-addr false))))))
+          (let [download-url* (atom nil)]
+            (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
+                        graph (str config/db-version-prefix graph-name)
+                        pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
+                                              {:method "GET"}
+                                              {:response-schema :sync/pull})
+                        remote-tx (:t pull-resp)
+                        _ (when-not (integer? remote-tx)
+                            (throw (ex-info "non-integer remote-tx when downloading graph"
+                                            {:graph graph-name
+                                             :remote-tx remote-tx})))
+                        download-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download")
+                                                  {:method "GET"}
+                                                  {:response-schema :sync/snapshot-download})
+                        download-url (:url download-resp)
+                        _ (reset! download-url* download-url)
+                        _ (when-not (string? download-url)
+                            (throw (ex-info "missing snapshot download url"
+                                            {:graph graph-name
+                                             :response download-resp})))
+                        resp (js/fetch download-url (clj->js (with-auth-headers {:method "GET"})))]
+                  (when-not (.-ok resp)
+                    (throw (ex-info "snapshot download failed"
+                                    {:graph graph-name
+                                     :status (.-status resp)})))
+                  (when-not (.-body resp)
+                    (throw (ex-info "snapshot download missing body"
+                                    {:graph graph-name})))
+                  (p/let [reader (.getReader (.-body resp))]
+                    (p/loop [buffer nil
+                             total 0
+                             total-rows []]
+                      (p/let [chunk (.read reader)]
+                        (if (.-done chunk)
+                          (let [rows (finalize-framed-buffer buffer)
+                                total' (+ total (count rows))
+                                total-rows' (into total-rows rows)]
+                            (when (seq total-rows')
+                              (p/do!
+                               (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
+                                                        graph total-rows' true)
+                               (state/<invoke-db-worker :thread-api/db-sync-finalize-kvs-import graph remote-tx)))
+                            total')
+                          (let [value (.-value chunk)
+                                {:keys [rows buffer]} (parse-framed-chunk buffer value)
+                                total' (+ total (count rows))]
+                            (p/recur buffer total' (into total-rows rows))))))))
+                (p/finally
+                  (fn []
+                    (when-let [download-url @download-url*]
+                      (js/fetch download-url (clj->js (with-auth-headers {:method "DELETE"}))))))))
           (p/rejected (ex-info "db-sync missing graph info"
                                {:type :db-sync/invalid-graph
                                 :graph-uuid graph-uuid

+ 77 - 32
src/main/frontend/worker/db_sync.cljs

@@ -129,6 +129,9 @@
 
 (def ^:private max-asset-size (* 100 1024 1024))
 (def ^:private upload-kvs-batch-size 2000)
+(def ^:private snapshot-content-type "application/transit+json")
+(def ^:private snapshot-content-encoding "gzip")
+(def ^:private snapshot-text-encoder (js/TextEncoder.))
 (def ^:private reconnect-base-delay-ms 1000)
 (def ^:private reconnect-max-delay-ms 30000)
 (def ^:private reconnect-jitter-ms 250)
@@ -996,43 +999,85 @@
 (defn- normalize-snapshot-rows [rows]
   (mapv (fn [row] (vec row)) (array-seq rows)))
 
+(defn- encode-snapshot-rows [rows]
+  (.encode snapshot-text-encoder (sqlite-util/write-transit-str rows)))
+
+(defn- frame-bytes [^js bytes]
+  (let [len (.-byteLength bytes)
+        out (js/Uint8Array. (+ 4 len))
+        view (js/DataView. (.-buffer out))]
+    (.setUint32 view 0 len false)
+    (.set out bytes 4)
+    out))
+
+(defn- snapshot-upload-stream [db]
+  (let [state (volatile! {:after -1 :done? false})]
+    (js/ReadableStream.
+     #js {:pull (fn [controller]
+                  (p/let [{:keys [after done?]} @state]
+                    (if done?
+                      (.close controller)
+                      (let [rows (fetch-kvs-rows db after upload-kvs-batch-size)]
+                        (if (empty? rows)
+                          (.close controller)
+                          (let [rows (normalize-snapshot-rows rows)
+                                last-addr (apply max (map first rows))
+                                done? (< (count rows) upload-kvs-batch-size)
+                                payload (encode-snapshot-rows rows)
+                                framed (frame-bytes payload)]
+                            (.enqueue controller framed)
+                            (vswap! state assoc :after last-addr :done? done?)))))))})))
+
+(defn- maybe-compress-stream [stream]
+  (if (exists? js/CompressionStream)
+    (.pipeThrough stream (js/CompressionStream. "gzip"))
+    stream))
+
+(defn- should-buffer-snapshot-upload?
+  [base]
+  (when (string? base)
+    (try
+      (let [url (js/URL. base)
+            host (.-hostname url)]
+        (and (= "http:" (.-protocol url))
+             (contains? #{"localhost" "127.0.0.1"} host)))
+      (catch :default _
+        false))))
+
+(defn- <buffer-stream
+  [stream]
+  (p/let [resp (js/Response. stream)
+          buf (.arrayBuffer resp)]
+    buf))
+
 (defn upload-graph!
   [repo]
   (let [base (http-base-url)
         graph-id (get-graph-id repo)]
-    (if-not (and (seq base) (seq graph-id))
-      (p/rejected (ex-info "db-sync missing upload info"
-                           {:repo repo :base base :graph-id graph-id}))
+    (if (and (seq base) (seq graph-id))
       (if-let [db (worker-state/get-sqlite-conn repo :db)]
         (do
           (ensure-client-graph-uuid! repo graph-id)
-          (p/loop [last-addr -1
-                   first-batch? true]
-            (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
-              (if (empty? rows)
-                (let [body (coerce-http-request :sync/snapshot-import {:reset false :rows []})]
-                  (if (nil? body)
-                    (p/rejected (ex-info "db-sync invalid snapshot body"
-                                         {:repo repo :graph-id graph-id}))
-                    (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
-                                          {:method "POST"
-                                           :headers {"content-type" "application/json"}
-                                           :body (js/JSON.stringify (clj->js body))}
-                                          {:response-schema :sync/snapshot-import})]
-                      (client-op/add-all-exists-asset-as-ops repo)
-                      {:graph-id graph-id})))
-                (let [max-addr (apply max (map first rows))
-                      rows (normalize-snapshot-rows rows)
-                      body (coerce-http-request :sync/snapshot-import {:reset first-batch?
-                                                                       :rows rows})]
-                  (if (nil? body)
-                    (p/rejected (ex-info "db-sync invalid snapshot body"
-                                         {:repo repo :graph-id graph-id}))
-                    (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
-                                          {:method "POST"
-                                           :headers {"content-type" "application/json"}
-                                           :body (js/JSON.stringify (clj->js body))}
-                                          {:response-schema :sync/snapshot-import})]
-                      (p/recur max-addr false))))))))
+          (let [stream (snapshot-upload-stream db)
+                use-compression? (exists? js/CompressionStream)
+                body (if use-compression? (maybe-compress-stream stream) stream)
+                headers (cond-> {"content-type" snapshot-content-type}
+                          use-compression? (assoc "content-encoding" snapshot-content-encoding))
+                upload-url (str base "/sync/" graph-id "/snapshot/upload?reset=true")
+                upload-opts {:method "POST"
+                             :headers headers
+                             :body body
+                             :duplex "half"}
+                fallback? (should-buffer-snapshot-upload? base)
+                do-upload (fn [opts]
+                            (fetch-json upload-url opts {:response-schema :sync/snapshot-upload}))]
+            (p/let [_ (if fallback?
+                        (p/let [buf (<buffer-stream body)]
+                          (do-upload (assoc upload-opts :body buf)))
+                        (do-upload upload-opts))]
+              (client-op/add-all-exists-asset-as-ops repo)
+              {:graph-id graph-id})))
         (p/rejected (ex-info "db-sync missing sqlite db"
-                             {:repo repo :graph-id graph-id}))))))
+                             {:repo repo :graph-id graph-id})))
+      (p/rejected (ex-info "db-sync missing upload info"
+                           {:repo repo :base base :graph-id graph-id})))))

+ 1 - 1
src/main/frontend/worker/db_worker.cljs

@@ -124,7 +124,7 @@
 
 (defn- rows->sqlite-binds
   [rows]
-  (mapv (fn [{:keys [addr content addresses]}]
+  (mapv (fn [[addr content addresses]]
           #js {:$addr addr
                :$content content
                :$addresses addresses})