Răsfoiți Sursa

enhance(db-sync): move graph download to worker and stream kv snapshots

Tienson Qin 1 săptămână în urmă
părinte
comite
b3e400c2ae

+ 1 - 1
deps/db-sync/src/logseq/db_sync/common.cljs

@@ -8,7 +8,7 @@
   #js {"Access-Control-Allow-Origin" "*"
        "Access-Control-Allow-Headers" "content-type,content-encoding,authorization,x-amz-meta-checksum,x-amz-meta-type"
        "Access-Control-Allow-Methods" "GET,POST,PUT,DELETE,OPTIONS,HEAD"
-       "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-datom-count"})
+       "Access-Control-Expose-Headers" "content-type,content-encoding,content-length,cache-control,x-asset-type,x-asset-size,x-snapshot-row-count"})
 
 (defn json-response
   ([data] (json-response data 200))

+ 44 - 57
deps/db-sync/src/logseq/db_sync/worker/handler/sync.cljs

@@ -13,12 +13,11 @@
             [logseq.db-sync.worker.routes.sync :as sync-routes]
             [logseq.db-sync.worker.ws :as ws]
             [logseq.db.frontend.schema :as db-schema]
-            [promesa.core :as p]
-            [logseq.db-sync.checksum :as checksum]))
+            [promesa.core :as p]))
 
 (def ^:private snapshot-download-batch-size 10000)
 (def ^:private snapshot-cache-control "private, max-age=300")
-(def ^:private snapshot-content-type "application/x-ndjson")
+(def ^:private snapshot-content-type "application/transit+json")
 (def ^:private snapshot-content-encoding "gzip")
 (def ^:private snapshot-uploading-meta-key :snapshot-uploading?)
 ;; 10m
@@ -56,18 +55,7 @@
 
 (defn current-checksum [^js self]
   (ensure-conn! self)
-  (let [db @(.-conn self)
-        full-checksum (checksum/recompute-checksum db)
-        cur-checksum (storage/get-checksum (.-sql self))]
-    (if (or (nil? cur-checksum)
-            (= full-checksum cur-checksum))
-      cur-checksum
-      (do
-        (log/error :db-sync/server-checksum-mismatch
-                   {:full-checksum full-checksum
-                    :current-checksum cur-checksum})
-        (storage/set-checksum! (.-sql self) full-checksum)
-        full-checksum))))
+  (storage/get-checksum (.-sql self)))
 
 (defn snapshot-upload-finished? [^js self]
   (ensure-schema! self)
@@ -174,47 +162,48 @@
       (.set out b (.-byteLength a))
       out)))
 
-(defn- snapshot-datom->jsonl-datom
-  [datom]
-  {:e (:e datom)
-   :a (:a datom)
-   :v (:v datom)
-   :tx (:tx datom)
-   :added (:added datom)})
-
-(defn- snapshot-datom-count
-  [conn]
-  (count (d/datoms @conn :eavt)))
-
-(defn- snapshot-export-datoms
-  [conn]
-  (let [db @conn
-        schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id)
-        ident-eids (into #{}
-                         (map :e)
-                         (d/datoms db :avet :db/ident))
-        jsonl-datoms (fn [pred]
-                       (sequence
-                        (comp (filter pred)
-                              (map snapshot-datom->jsonl-datom))
-                        (d/datoms db :eavt)))]
-    (concat (jsonl-datoms #(= schema-version-eid (:e %)))
-            (jsonl-datoms #(and (contains? ident-eids (:e %))
-                                (not= schema-version-eid (:e %))))
-            (jsonl-datoms #(not (contains? ident-eids (:e %)))))))
+(defn- frame-bytes
+  [^js data]
+  (let [len (.-byteLength data)
+        out (js/Uint8Array. (+ 4 len))
+        view (js/DataView. (.-buffer out))]
+    (.setUint32 view 0 len false)
+    (.set out data 4)
+    out))
+
+(defn- fetch-snapshot-kvs-rows
+  [sql last-addr limit]
+  (let [rows (common/get-sql-rows
+              (common/sql-exec sql
+                               "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
+                               last-addr
+                               limit))]
+    (mapv (fn [row]
+            [(aget row "addr")
+             (aget row "content")
+             (aget row "addresses")])
+          rows)))
+
+(defn- snapshot-row-count
+  [sql]
+  (if-let [row (first (common/get-sql-rows
+                       (common/sql-exec sql "select count(*) as row_count from kvs")))]
+    (or (aget row "row_count") 0)
+    0))
 
 (defn- snapshot-export-stream [^js self]
-  (ensure-conn! self)
-  (let [remaining (volatile! (seq (snapshot-export-datoms (.-conn self))))]
+  (ensure-schema! self)
+  (let [sql (.-sql self)
+        last-addr (volatile! -1)]
     (js/ReadableStream.
-     #js {:pull (fn [controller]
-                  (let [batch (vec (take snapshot-download-batch-size @remaining))]
-                    (if (empty? batch)
-                      (.close controller)
-                      (let [remaining' (drop snapshot-download-batch-size @remaining)
-                            payload (snapshot/encode-datoms-jsonl batch)]
-                        (vreset! remaining (seq remaining'))
-                        (.enqueue controller payload)))))})))
+     (clj->js
+      {:pull (fn [controller]
+               (let [batch (fetch-snapshot-kvs-rows sql @last-addr snapshot-download-batch-size)]
+                 (if (empty? batch)
+                   (.close controller)
+                   (let [payload (snapshot/encode-rows batch)]
+                     (vreset! last-addr (first (peek batch)))
+                     (.enqueue controller (frame-bytes payload))))))}))))
 
 (defn- upload-multipart!
   [^js bucket key stream opts]
@@ -419,15 +408,13 @@
       (http/bad-request "missing graph id")
       (let [stream (-> (snapshot-export-stream self)
                        (maybe-compress-stream))
-            conn (or (.-conn self)
-                     (do (ensure-conn! self) (.-conn self)))
-            datom-count (snapshot-datom-count conn)]
+            row-count (snapshot-row-count (.-sql self))]
         (js/Response. stream
                       #js {:status 200
                            :headers (js/Object.assign
                                      #js {"content-type" snapshot-content-type
                                           "content-encoding" snapshot-content-encoding}
-                                     #js {"x-snapshot-datom-count" (str datom-count)}
+                                     #js {"x-snapshot-row-count" (str row-count)}
                                      (common/cors-headers))})))))
 
 (defn- handle-sync-snapshot-download

+ 45 - 35
deps/db-sync/test/logseq/db_sync/worker_handler_sync_test.cljs

@@ -30,47 +30,49 @@
 (deftest snapshot-download-uses-gzip-encoding-when-compression-supported-test
   (async done
          (let [put-call (atom nil)
+               rows [[1 "row-1" nil]
+                     [2 "row-2" "{\"a\":1}"]]
                bucket #js {:put (fn [key body opts]
                                   (reset! put-call {:key key :body body :opts opts})
                                   (js/Promise.resolve #js {:ok true}))}
+               sql (empty-sql)
                conn (d/create-conn db-schema/schema)
                self #js {:env #js {:LOGSEQ_SYNC_ASSETS bucket}
                          :conn conn
                          :schema-ready true
-                         :sql (empty-sql)}
+                         :sql sql}
                {:keys [request url]} (request-url)
                original-compression-stream (.-CompressionStream js/globalThis)
                restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
-           (d/transact! conn [{:db/ident :logseq.class/Page
-                               :block/title "Page"}
-                              {:db/ident :logseq.kv/schema-version
-                               :kv/value {:major 65 :minor 23}}
-                              {:db/id 2 :block/title "hello"}])
            (aset js/globalThis
                  "CompressionStream"
                  (passthrough-compression-stream-constructor))
-           (-> (p/let [resp (sync-handler/handle {:self self
-                                                  :request request
-                                                  :url url
-                                                  :route {:handler :sync/snapshot-download}})
+           (-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit]
+                                                                      (if (neg? last-addr) rows []))
+                               sync-handler/snapshot-row-count (fn [_sql] (count rows))]
+                 (p/let [resp (sync-handler/handle {:self self
+                                                    :request request
+                                                    :url url
+                                                    :route {:handler :sync/snapshot-download}})
                        text (.text resp)
                        body (js->clj (js/JSON.parse text) :keywordize-keys true)
                        http-metadata (aget (:opts @put-call) "httpMetadata")
                        payload (js/Uint8Array. (:body @put-call))
-                       {:keys [datoms]} (snapshot/parse-datoms-jsonl-chunk nil payload)]
+                       rows (snapshot/finalize-framed-buffer payload)
+                       addrs (mapv first rows)]
                  (is (= 200 (.-status resp)))
                  (is (= "gzip" (:content-encoding body)))
                  (is (= "gzip" (aget http-metadata "contentEncoding")))
-                 (is (= "application/x-ndjson" (aget http-metadata "contentType")))
-                 (is (= 5 (count datoms)))
-                 (is (= [:logseq.kv/schema-version
-                         :logseq.kv/schema-version
-                         :logseq.kv/schema-version
-                         :logseq.class/Page
-                         :logseq.class/Page]
-                        (mapv (fn [{:keys [e]}]
-                                (:db/ident (d/entity @conn e)))
-                              datoms))))
+                 (is (= "application/transit+json" (aget http-metadata "contentType")))
+                 (is (= 2 (count rows)))
+                 (is (= (sort addrs) addrs))
+                 (is (every? (fn [[addr content _addresses]]
+                               (and (int? addr)
+                                    (string? content)))
+                             rows))
+                 (is (= [[1 "row-1" nil]
+                         [2 "row-2" "{\"a\":1}"]]
+                        rows))))
                (p/then (fn []
                          (restore!)
                          (done)))
@@ -79,36 +81,44 @@
                           (is false (str error))
                           (done)))))))
 
-(deftest snapshot-download-stream-route-returns-jsonl-datoms-test
+(deftest snapshot-download-stream-route-returns-framed-kvs-rows-test
   (async done
-         (let [conn (d/create-conn db-schema/schema)
+         (let [rows [[1 "row-1" nil]
+                     [2 "row-2" nil]]
+               sql (empty-sql)
+               conn (d/create-conn db-schema/schema)
                self #js {:env #js {}
                          :conn conn
                          :schema-ready true
-                         :sql (empty-sql)}
+                         :sql sql}
                {:keys [request]} (request-url "/sync/graph-1/snapshot/stream?graph-id=graph-1")
                original-compression-stream (.-CompressionStream js/globalThis)
                restore! #(aset js/globalThis "CompressionStream" original-compression-stream)]
-           (d/transact! conn [{:db/ident :logseq.class/Page
-                               :block/title "Page"}
-                              {:db/ident :logseq.kv/schema-version
-                               :kv/value {:major 65 :minor 23}}
-                              {:db/id 2 :block/title "hello"}])
            (aset js/globalThis
                  "CompressionStream"
                  (passthrough-compression-stream-constructor))
-           (-> (p/let [resp (sync-handler/handle-http self request)
+           (-> (p/with-redefs [sync-handler/fetch-snapshot-kvs-rows (fn [_sql last-addr _limit]
+                                                                      (if (neg? last-addr) rows []))
+                               sync-handler/snapshot-row-count (fn [_sql] (count rows))]
+                 (p/let [resp (sync-handler/handle-http self request)
                        encoding (.get (.-headers resp) "content-encoding")
                        content-type (.get (.-headers resp) "content-type")
                        buf (.arrayBuffer resp)
                        payload (js/Uint8Array. buf)
-                       datoms (snapshot/finalize-datoms-jsonl-buffer payload)]
+                       rows (snapshot/finalize-framed-buffer payload)
+                       addrs (mapv first rows)]
                  (is (= 200 (.-status resp)))
                  (is (= "gzip" encoding))
-                 (is (= "application/x-ndjson" content-type))
-                 (is (= 5 (count datoms)))
-                 (is (= :logseq.kv/schema-version
-                        (:db/ident (d/entity @conn (:e (first datoms)))))))
+                 (is (= "application/transit+json" content-type))
+                 (is (= 2 (count rows)))
+                 (is (= (sort addrs) addrs))
+                 (is (every? (fn [[addr content _addresses]]
+                               (and (int? addr)
+                                    (string? content)))
+                             rows))
+                 (is (= [[1 "row-1" nil]
+                         [2 "row-2" nil]]
+                        rows))))
                (p/then (fn []
                          (restore!)
                          (done)))

+ 3 - 1
deps/db/src/logseq/db.cljs

@@ -100,7 +100,9 @@
   [conn]
   (when-some [_storage (storage/storage @conn)]
     (when-not (:batch-tx? @conn)
-      (let [f (or @*debounce-fn d/store)]
+      (let [f (if (exists? js/process)
+                d/store
+                (or @*debounce-fn d/store))]
         (f @conn)))))
 
 (defn- transact-sync

+ 58 - 0
docs/adr/0014-kv-row-r2-snapshot-download.md

@@ -0,0 +1,58 @@
+# ADR 0014: KV-Row R2 Snapshot Download With Worker-Owned Low-Memory Import
+
+Date: 2026-04-01
+Status: Proposed
+
+## Context
+Snapshot download previously exported Datascript datoms as gzip NDJSON from the
+server and parsed/transacted datoms on the client main-thread handler path.
+
+That design had two issues:
+
+1. Server snapshot export walked full datoms and spent avoidable CPU/memory.
+2. Client download logic lived in handler code and was not aligned with worker
+   ownership for large-graph import.
+
+We already use framed Transit `kvs` rows for snapshot upload. Download should
+converge on the same wire format.
+
+## Decision
+1. `GET /sync/:graph-id/snapshot/download` and `/snapshot/stream` export framed
+   Transit `kvs` rows (`[addr content addresses]`) instead of datom NDJSON.
+2. Snapshot download payload content-type is `application/transit+json`
+   (gzip-compressed when available).
+3. Server snapshot export reads directly from sqlite `kvs` rows in ascending
+   `addr` batches and streams framed payloads to response/R2.
+4. Graph snapshot download orchestration is moved to
+   `frontend.worker.sync.download` and invoked from db-worker thread API.
+5. Handler code delegates graph download to worker API instead of parsing
+   snapshot payloads directly.
+6. Client import adds row-chunk API (`:thread-api/db-sync-import-rows-chunk`).
+   Row batches are staged in temp sqlite, then replayed into target conn in
+   schema-first order.
+7. Replay order must transact schema-critical datoms before regular data:
+   - `:logseq.kv/schema-version` entity datoms
+   - attribute-definition datoms (`:db/ident` and `:db/*` metadata such as
+     `:db/valueType`, `:db/cardinality`, `:db/unique`, `:db/isComponent`)
+   - all remaining datoms
+
+## Consequences
+
+### Positive
+- Lower server CPU/memory for snapshot export (no datom NDJSON generation).
+- Download/upload snapshot format is unified around framed `kvs` rows.
+- Download pipeline ownership moves to worker sync module.
+- Schema-first replay protects index/schema correctness for large imports.
+
+### Tradeoffs
+- Client still performs datom replay during finalize to rebuild a consistent
+  target store, so import cost shifts to worker finalize phase.
+- Adds temp sqlite staging and one additional import path (`rows` alongside
+  legacy datom chunk path).
+
+## Verification
+- Server tests assert snapshot download/stream return framed kv rows with
+  transit content-type and sorted addresses.
+- Handler tests assert graph download delegates to worker API and maintains
+  download-state lifecycle.
+- Worker tests assert rows-chunk API wiring and schema-first import ordering.

+ 1 - 1
docs/agent-guide/db-sync/protocol.md

@@ -99,7 +99,7 @@
   - Build a snapshot file in R2 and return a download URL.
   - Response: `{"ok":true,"key":"<graph-id>/<uuid>.snapshot","url":"<origin>/assets/:graph-id/<uuid>.snapshot","content-encoding":"gzip"}`.
   - Error response (409): `{"error":"graph not ready"}` when bootstrap upload/import has not finished.
-  - The snapshot file stored in R2 is a gzip-compressed NDJSON stream of full Datascript datoms. Each line is a Transit JSON datom map: `{e,a,v,tx,added}`.
+  - The snapshot file stored in R2 is a framed Transit stream of sqlite `kvs` rows (`[addr, content, addresses]`), optionally gzip-compressed.
 - `POST /sync/:graph-id/snapshot/upload?reset=true|false`
   - Upload a snapshot stream for bootstrap import. Current upload format remains framed Transit JSON kvs rows, optionally gzip-compressed.
   - Request body: binary stream; headers should include `content-type: application/transit+json` and `content-encoding: gzip` when compressed.

+ 5 - 139
src/main/frontend/handler/db_based/sync.cljs

@@ -11,7 +11,6 @@
             [lambdaisland.glogi :as log]
             [logseq.db :as ldb]
             [logseq.db-sync.malli-schema :as db-sync-schema]
-            [logseq.db-sync.snapshot :as snapshot]
             [logseq.db.sqlite.util :as sqlite-util]
             [promesa.core :as p]))
 
@@ -32,96 +31,6 @@
   (or config/db-sync-http-base
       (ws->http-base config/db-sync-ws-url)))
 
-(defn- ->uint8 [data]
-  (cond
-    (instance? js/Uint8Array data) data
-    (instance? js/ArrayBuffer data) (js/Uint8Array. data)
-    (string? data) (.encode (js/TextEncoder.) data)
-    :else (js/Uint8Array. data)))
-
-(defn- gzip-bytes?
-  [^js payload]
-  (and (some? payload)
-       (>= (.-byteLength payload) 2)
-       (= 31 (aget payload 0))
-       (= 139 (aget payload 1))))
-
-(defn- bytes->stream
-  [^js payload]
-  (js/ReadableStream.
-   #js {:start (fn [controller]
-                 (.enqueue controller payload)
-                 (.close controller))}))
-
-(defn- <decompress-gzip-bytes
-  [^js payload]
-  (if (exists? js/DecompressionStream)
-    (p/let [stream (bytes->stream payload)
-            decompressed (.pipeThrough stream (js/DecompressionStream. "gzip"))
-            resp (js/Response. decompressed)
-            buf (.arrayBuffer resp)]
-      (->uint8 buf))
-    (p/rejected (ex-info "gzip decompression not supported"
-                         {:type :db-sync/decompression-not-supported}))))
-
-(defn- <snapshot-response-bytes
-  [^js resp]
-  (p/let [buf (.arrayBuffer resp)
-          chunk (->uint8 buf)]
-    (if (gzip-bytes? chunk)
-      (<decompress-gzip-bytes chunk)
-      chunk)))
-
-(defn- response-body-stream
-  [^js resp]
-  (let [encoding (some-> resp .-headers (.get "content-encoding"))]
-    (cond
-      (nil? (.-body resp))
-      nil
-
-      (= "gzip" encoding)
-      (when (exists? js/DecompressionStream)
-        (.pipeThrough (.-body resp) (js/DecompressionStream. "gzip")))
-
-      :else
-      (.-body resp))))
-
-(defn- <flush-datom-batches!
-  [datoms batch-size on-batch]
-  (p/loop [remaining datoms]
-    (if (>= (count remaining) batch-size)
-      (let [batch (subvec remaining 0 batch-size)
-            rest-datoms (subvec remaining batch-size)]
-        (p/let [_ (on-batch batch)]
-          (p/recur rest-datoms)))
-      remaining)))
-
-(defn- <stream-snapshot-datom-batches!
-  [^js resp batch-size on-batch]
-  (if-let [stream (response-body-stream resp)]
-    (let [reader (.getReader stream)]
-      (p/loop [buffer nil
-               pending []]
-        (p/let [result (.read reader)]
-          (if (.-done result)
-            (let [pending (if (and buffer (pos? (.-byteLength buffer)))
-                            (into pending (snapshot/finalize-datoms-jsonl-buffer buffer))
-                            pending)]
-              (if (seq pending)
-                (p/let [_ (on-batch pending)]
-                  {:chunk-count 1})
-                {:chunk-count 0}))
-            (let [{datoms :datoms next-buffer :buffer} (snapshot/parse-datoms-jsonl-chunk buffer (->uint8 (.-value result)))
-                  pending (into pending datoms)]
-              (p/let [pending (<flush-datom-batches! pending batch-size on-batch)]
-                (p/recur next-buffer pending)))))))
-    (p/let [snapshot-bytes (<snapshot-response-bytes resp)
-            datoms (vec (snapshot/finalize-datoms-jsonl-buffer snapshot-bytes))]
-      (if (seq datoms)
-        (p/let [_ (on-batch datoms)]
-          {:chunk-count 1})
-        {:chunk-count 0}))))
-
 (defn- auth-headers []
   (when-let [token (state/get-auth-id-token)]
     {"authorization" (str "Bearer " token)}))
@@ -364,54 +273,11 @@
    (let [graph-e2ee? (normalize-graph-e2ee? graph-e2ee?)
          base (http-base)]
      (-> (if (and graph-uuid base)
-           (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
-                       graph (str config/db-version-prefix graph-name)
-                       pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
-                                             {:method "GET"}
-                                             {:response-schema :sync/pull})
-                       remote-tx (:t pull-resp)
-                       _ (when-not (integer? remote-tx)
-                           (throw (ex-info "non-integer remote-tx when downloading graph"
-                                           {:graph graph-name
-                                            :remote-tx remote-tx})))
-                       snapshot-resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/download")
-                                                 {:method "GET"}
-                                                 {:response-schema :sync/snapshot-download})
-                       resp (js/fetch (:url snapshot-resp)
-                                      (clj->js (with-auth-headers {:method "GET"})))
-                       _ (state/pub-event!
-                          [:rtc/log {:type :rtc.log/download
-                                     :sub-type :download-progress
-                                     :graph-uuid graph-uuid
-                                     :message "Start downloading graph snapshot"}])]
-                 (when-not (.-ok resp)
-                   (throw (ex-info "snapshot download failed"
-                                   {:graph graph-name
-                                    :status (.-status resp)})))
-                 (let [import-id* (atom nil)
-                       ensure-import! (fn []
-                                        (if-let [import-id @import-id*]
-                                          (p/resolved import-id)
-                                          (p/let [{:keys [import-id]} (state/<invoke-db-worker :thread-api/db-sync-import-prepare
-                                                                                               graph true graph-uuid graph-e2ee?)]
-                                            (reset! import-id* import-id)
-                                            import-id)))]
-                   (p/let [_ (<stream-snapshot-datom-batches!
-                              resp
-                              25000
-                              (fn [datoms]
-                                (p/let [import-id (ensure-import!)]
-                                  (state/<invoke-db-worker :thread-api/db-sync-import-datoms-chunk
-                                                           datoms graph-uuid import-id))))
-                           _ (state/pub-event!
-                              [:rtc/log {:type :rtc.log/download
-                                         :sub-type :download-completed
-                                         :graph-uuid graph-uuid
-                                         :message "Graph snapshot downloaded"}])
-                           _ (when-let [import-id @import-id*]
-                               (state/<invoke-db-worker :thread-api/db-sync-import-finalize
-                                                        graph graph-uuid remote-tx import-id))]
-                     true))))
+           (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
+                   graph (str config/db-version-prefix graph-name)
+                   _ (state/<invoke-db-worker :thread-api/db-sync-download-graph
+                                              graph graph-uuid graph-e2ee?)]
+             true)
            (p/rejected (ex-info "db-sync missing graph info"
                                 {:type :db-sync/invalid-graph
                                  :graph-uuid graph-uuid

+ 150 - 8
src/main/frontend/worker/db_worker.cljs

@@ -28,6 +28,7 @@
             [frontend.worker.sync.asset-db-listener]
             [frontend.worker.sync.client-op :as client-op]
             [frontend.worker.sync.crypt :as sync-crypt]
+            [frontend.worker.sync.download :as sync-download]
             [frontend.worker.sync.log-and-state :as rtc-log-and-state]
             [frontend.worker.thread-atom]
             [frontend.worker.undo-redo :as worker-undo-redo]
@@ -505,6 +506,18 @@
   [repo]
   (db-sync/upload-graph! repo))
 
+(def-thread-api :thread-api/db-sync-download-graph
+  [repo graph-id graph-e2ee?]
+  (sync-download/download-graph-snapshot!
+   repo
+   graph-id
+   graph-e2ee?
+   {:prepare-f (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
+    :import-rows-f (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk)
+    :finalize-f (@thread-api/*thread-apis :thread-api/db-sync-import-finalize)
+    :log-f (fn [payload]
+             (rtc-log-and-state/rtc-log :rtc.log/download payload))}))
+
 (def-thread-api :thread-api/set-infer-worker-proxy
   [infer-worker-proxy]
   (reset! worker-state/*infer-worker infer-worker-proxy)
@@ -740,6 +753,7 @@
 
 ;; Chunked import state - held between prepare/chunk/finalize calls
 (defonce ^:private *import-state (atom nil))
+(def ^:private snapshot-import-datoms-batch-size 10000)
 
 (defn- stale-import-ex-info
   [repo graph-id import-id]
@@ -749,16 +763,26 @@
             :graph-id graph-id
             :import-id import-id}))
 
+(defn- <remove-import-temp-db-file!
+  [repo path]
+  (-> (p/let [^js root (.getDirectory js/navigator.storage)
+              ^js dir (.getDirectoryHandle root (str "." (worker-util/get-pool-name repo)))]
+        (.removeEntry dir (subs path 1)))
+      (p/catch
+       (fn [error]
+         (if (= "NotFoundError" (.-name error))
+           nil
+           (p/rejected error))))))
+
 (defn- close-import-state!
-  [{:keys [db import-pool]}]
-  (when db
+  [{:keys [repo rows-db rows-path]}]
+  (when rows-db
     (try
-      (.close db)
+      (.close rows-db)
       (catch :default _)))
-  (when import-pool
-    (try
-      (remove-vfs! import-pool)
-      (catch :default _))))
+  (when (and repo rows-path)
+    (-> (<remove-import-temp-db-file! repo rows-path)
+        (p/catch (fn [_] nil)))))
 
 (defn- clear-import-state!
   [import-id]
@@ -781,6 +805,17 @@
   [{:keys [e a v]}]
   [:db/add e a v])
 
+(defn- <create-import-temp-db!
+  [repo]
+  (p/let [^js pool (<get-opfs-pool repo)
+          _ (when-not pool
+              (db-sync/fail-fast :db-sync/missing-field {:repo repo :field :opfs-pool}))
+          path (str "/download-import-" (random-uuid) ".sqlite")
+          ^js db (new (.-OpfsSAHPoolDb pool) path)]
+    (common-sqlite/create-kvs-table! db)
+    {:rows-db db
+     :rows-path path}))
+
 (defn- import-datoms-batch!
   [conn aes-key graph-e2ee? datoms]
   (p/let [datoms-batch (if graph-e2ee?
@@ -796,6 +831,92 @@
     (when (seq tx-data)
       (d/transact! conn tx-data {:sync-download-graph? true}))))
 
+(defn import-rows-batch!
+  [{:keys [rows-db]} rows]
+  (when-not rows-db
+    (throw (ex-info "missing import rows db"
+                    {:type :db-sync/missing-field
+                     :field :rows-db})))
+  (let [data (map (fn [[addr content addresses]]
+                    #js {:$addr addr
+                         :$content content
+                         :$addresses addresses})
+                  rows)]
+    (upsert-addr-content! rows-db data))
+  (count rows))
+
+(defn- <ensure-import-rows-db!
+  [{:keys [import-id repo rows-db] :as state}]
+  (if rows-db
+    (p/resolved state)
+    (p/let [{:keys [rows-db rows-path]} (<create-import-temp-db! repo)]
+      (swap! *import-state
+             (fn [current]
+               (if (= import-id (:import-id current))
+                 (assoc current
+                        :rows-db rows-db
+                        :rows-path rows-path)
+                 current)))
+      (assoc state
+             :rows-db rows-db
+             :rows-path rows-path))))
+
+(defn- schema-datom?
+  [ident-eids schema-version-eid datom]
+  (or (= schema-version-eid (:e datom))
+      (and (contains? ident-eids (:e datom))
+           (or (= :db/ident (:a datom))
+               (= "db" (namespace (:a datom)))))))
+
+(defn- snapshot-datoms-in-import-order
+  [conn]
+  (let [db @conn
+        schema-version-eid (some-> (d/entity db :logseq.kv/schema-version) :db/id)
+        ident-eids (into #{}
+                         (map :e)
+                         (d/datoms db :avet :db/ident))
+        schema-datom?* #(schema-datom? ident-eids schema-version-eid %)
+        ordered-datoms (fn [pred]
+                         (sequence
+                          (comp (filter pred)
+                                (map #(select-keys % [:e :a :v])))
+                          (d/datoms db :eavt)))]
+    (concat (ordered-datoms schema-datom?*)
+            (ordered-datoms #(not (schema-datom?* %))))))
+
+(defn- take-import-datoms-batch
+  [datoms batch-size]
+  (loop [batch (transient [])
+         remaining (seq datoms)
+         n 0]
+    (if (or (nil? remaining)
+            (>= n batch-size))
+      [(persistent! batch) remaining]
+      (recur (conj! batch (first remaining))
+             (next remaining)
+             (inc n)))))
+
+(defn- <yield-next-tick
+  []
+  (js/Promise. (fn [resolve] (js/setTimeout resolve 0))))
+
+(declare log-import-progress!)
+
+(defn- <replay-imported-rows!
+  [{:keys [conn rows-db aes-key graph-e2ee? graph-id import-id]}]
+  (if (nil? rows-db)
+    (p/resolved nil)
+    (let [source-storage (new-sqlite-storage rows-db)
+          source-conn (common-sqlite/get-storage-conn source-storage db-schema/schema)]
+      (p/loop [remaining (seq (snapshot-datoms-in-import-order source-conn))]
+        (if (seq remaining)
+          (let [[batch remaining'] (take-import-datoms-batch remaining snapshot-import-datoms-batch-size)]
+            (p/let [_ (import-datoms-batch! conn aes-key graph-e2ee? batch)
+                    _ (log-import-progress! graph-id import-id (count batch))
+                    _ (<yield-next-tick)]
+              (p/recur remaining')))
+          (p/resolved nil))))))
+
 (defn- log-import-progress!
   [graph-id import-id datoms-count]
   (when (pos? datoms-count)
@@ -837,6 +958,9 @@
                                  :graph-id graph-id
                                  :import-id import-id
                                  :imported-datoms 0
+                                 :rows-db nil
+                                 :rows-imported? false
+                                 :rows-path nil
                                  :repo repo
                                  :total-datoms total-datoms})
           {:import-id import-id})
@@ -855,9 +979,27 @@
                    (clear-import-state! import-id))
                  (throw error)))))
 
+(def-thread-api :thread-api/db-sync-import-rows-chunk
+  [rows graph-id import-id]
+  (-> (p/let [state (require-import-state! nil graph-id import-id)
+              state (<ensure-import-rows-db! state)
+              _ (import-rows-batch! state rows)
+              _ (swap! *import-state
+                       (fn [current]
+                         (if (= import-id (:import-id current))
+                           (assoc current :rows-imported? true)
+                           current)))]
+        true)
+      (p/catch (fn [error]
+                 (when-not (= :db-sync/stale-import (:type (ex-data error)))
+                   (clear-import-state! import-id))
+                 (throw error)))))
+
 (def-thread-api :thread-api/db-sync-import-finalize
   [repo graph-id remote-tx import-id]
-  (-> (p/let [_ (require-import-state! repo graph-id import-id)
+  (-> (p/let [state (require-import-state! repo graph-id import-id)
+              _ (when (:rows-imported? state)
+                  (<replay-imported-rows! state))
               result (complete-datoms-import! repo graph-id remote-tx)
               _ (reset! *import-state nil)]
         result)

+ 2 - 5
src/main/frontend/worker/sync/client_op.cljs

@@ -99,11 +99,8 @@
 
 (defn get-local-tx
   [repo]
-  (let [conn (worker-state/get-client-ops-conn repo)]
-    (assert (some? conn) repo)
-    (let [r (:v (first (d/datoms @conn :avet :local-tx)))]
-      ;; (assert (some? r))
-      r)))
+  (when-let [conn (worker-state/get-client-ops-conn repo)]
+    (:v (first (d/datoms @conn :avet :local-tx)))))
 
 (defn get-pending-local-tx-count
   [repo]

+ 161 - 1
src/main/frontend/worker/sync/download.cljs

@@ -1,5 +1,5 @@
 (ns frontend.worker.sync.download
-  "Download helpers for db sync assets."
+  "Download helpers for db sync assets and graph snapshots."
   (:require [datascript.core :as d]
             [frontend.common.crypt :as crypt]
             [frontend.worker.state :as worker-state]
@@ -7,7 +7,9 @@
             [frontend.worker.sync.client-op :as client-op]
             [frontend.worker.sync.crypt :as sync-crypt]
             [frontend.worker.sync.large-title :as sync-large-title]
+            [frontend.worker.sync.transport :as sync-transport]
             [logseq.db :as ldb]
+            [logseq.db-sync.snapshot :as snapshot]
             [promesa.core :as p]))
 
 (defn exported-graph-aes-key
@@ -65,3 +67,161 @@
                         (broadcast-rtc-state!-f client))))
                   (p/catch (fn [e]
                              (js/console.error e)))))))))))
+
+(defn- ->uint8 [data]
+  (cond
+    (instance? js/Uint8Array data) data
+    (instance? js/ArrayBuffer data) (js/Uint8Array. data)
+    (string? data) (.encode (js/TextEncoder.) data)
+    :else (js/Uint8Array. data)))
+
+(defn- gzip-bytes?
+  [^js payload]
+  (and (some? payload)
+       (>= (.-byteLength payload) 2)
+       (= 31 (aget payload 0))
+       (= 139 (aget payload 1))))
+
+(defn- bytes->stream
+  [^js payload]
+  (js/ReadableStream.
+   #js {:start (fn [controller]
+                 (.enqueue controller payload)
+                 (.close controller))}))
+
+(defn- <decompress-gzip-bytes
+  [^js payload]
+  (if (exists? js/DecompressionStream)
+    (p/let [stream (bytes->stream payload)
+            decompressed (.pipeThrough stream (js/DecompressionStream. "gzip"))
+            resp (js/Response. decompressed)
+            buf (.arrayBuffer resp)]
+      (->uint8 buf))
+    (p/rejected (ex-info "gzip decompression not supported"
+                         {:type :db-sync/decompression-not-supported}))))
+
+(defn- <snapshot-response-bytes
+  [^js resp]
+  (p/let [buf (.arrayBuffer resp)
+          chunk (->uint8 buf)]
+    (if (gzip-bytes? chunk)
+      (<decompress-gzip-bytes chunk)
+      chunk)))
+
+(defn- response-body-stream
+  [^js resp]
+  (let [encoding (some-> resp .-headers (.get "content-encoding"))]
+    (cond
+      (nil? (.-body resp))
+      nil
+
+      (= "gzip" encoding)
+      (when (exists? js/DecompressionStream)
+        (.pipeThrough (.-body resp) (js/DecompressionStream. "gzip")))
+
+      :else
+      (.-body resp))))
+
+(defn- <flush-row-batches!
+  [rows batch-size on-batch]
+  (p/loop [remaining rows]
+    (if (>= (count remaining) batch-size)
+      (let [batch (subvec remaining 0 batch-size)
+            rest-rows (subvec remaining batch-size)]
+        (p/let [_ (on-batch batch)]
+          (p/recur rest-rows)))
+      remaining)))
+
+(defn- <stream-snapshot-row-batches!
+  [^js resp batch-size on-batch]
+  (if-let [stream (response-body-stream resp)]
+    (let [reader (.getReader stream)]
+      (p/loop [buffer nil
+               pending []]
+        (p/let [result (.read reader)]
+          (if (.-done result)
+            (let [pending (if (and buffer (pos? (.-byteLength buffer)))
+                            (into pending (snapshot/finalize-framed-buffer buffer))
+                            pending)]
+              (if (seq pending)
+                (p/let [_ (on-batch pending)]
+                  {:chunk-count 1})
+                {:chunk-count 0}))
+            (let [{rows :rows next-buffer :buffer} (snapshot/parse-framed-chunk buffer (->uint8 (.-value result)))
+                  pending (into pending rows)]
+              (p/let [pending (<flush-row-batches! pending batch-size on-batch)]
+                (p/recur next-buffer pending)))))))
+    (p/let [snapshot-bytes (<snapshot-response-bytes resp)
+            rows (vec (snapshot/finalize-framed-buffer snapshot-bytes))]
+      (if (seq rows)
+        (p/let [_ (on-batch rows)]
+          {:chunk-count 1})
+        {:chunk-count 0}))))
+
+(defn- with-auth-headers
+  [opts]
+  (sync-auth/with-auth-headers
+   #(sync-auth/auth-headers (worker-state/get-id-token))
+   opts))
+
+(defn- fetch-json
+  [url opts schema]
+  (sync-transport/fetch-json
+   with-auth-headers
+   url
+   opts
+   {:response-schema schema}))
+
+(defn download-graph-snapshot!
+  [repo graph-id graph-e2ee? {:keys [prepare-f import-rows-f finalize-f log-f]}]
+  (let [base (sync-auth/http-base-url @worker-state/*db-sync-config)]
+    (if (and (seq repo) (seq graph-id) (seq base))
+      (p/let [_ (when log-f
+                  (log-f {:sub-type :download-progress
+                          :graph-uuid graph-id
+                          :message "Preparing graph snapshot download"}))
+              pull-resp (fetch-json (str base "/sync/" graph-id "/pull")
+                                    {:method "GET"}
+                                    :sync/pull)
+              remote-tx (:t pull-resp)
+              _ (when-not (integer? remote-tx)
+                  (throw (ex-info "non-integer remote-tx when downloading graph"
+                                  {:repo repo
+                                   :remote-tx remote-tx})))
+              snapshot-resp (fetch-json (str base "/sync/" graph-id "/snapshot/download")
+                                        {:method "GET"}
+                                        :sync/snapshot-download)
+              resp (js/fetch (:url snapshot-resp)
+                             (clj->js (with-auth-headers {:method "GET"})))
+              _ (when log-f
+                  (log-f {:sub-type :download-progress
+                          :graph-uuid graph-id
+                          :message "Start downloading graph snapshot"}))]
+        (when-not (.-ok resp)
+          (throw (ex-info "snapshot download failed"
+                          {:repo repo
+                           :status (.-status resp)})))
+        (let [import-id* (atom nil)
+              ensure-import! (fn []
+                               (if-let [import-id @import-id*]
+                                 (p/resolved import-id)
+                                 (p/let [{:keys [import-id]} (prepare-f repo true graph-id graph-e2ee?)]
+                                   (reset! import-id* import-id)
+                                   import-id)))]
+          (p/let [_ (<stream-snapshot-row-batches!
+                     resp
+                     25000
+                     (fn [rows]
+                       (p/let [import-id (ensure-import!)]
+                         (import-rows-f rows graph-id import-id))))
+                  _ (when log-f
+                      (log-f {:sub-type :download-completed
+                              :graph-uuid graph-id
+                              :message "Graph snapshot downloaded"}))
+                  _ (when-let [import-id @import-id*]
+                      (finalize-f repo graph-id remote-tx import-id))]
+            true)))
+      (p/rejected (ex-info "db-sync missing graph download info"
+                           {:repo repo
+                            :graph-id graph-id
+                            :base base})))))

+ 29 - 159
src/test/frontend/handler/db_based/sync_test.cljs

@@ -7,35 +7,8 @@
             [frontend.handler.user :as user-handler]
             [frontend.state :as state]
             [logseq.db :as ldb]
-            [logseq.db-sync.snapshot :as snapshot]
             [promesa.core :as p]))
 
-(defn- encode-datoms-jsonl [datoms]
-  (snapshot/encode-datoms-jsonl datoms))
-
-(defn- <gzip-bytes [^js payload]
-  (if (exists? js/CompressionStream)
-    (p/let [stream (js/ReadableStream.
-                    #js {:start (fn [controller]
-                                  (.enqueue controller payload)
-                                  (.close controller))})
-            compressed (.pipeThrough stream (js/CompressionStream. "gzip"))
-            resp (js/Response. compressed)
-            buf (.arrayBuffer resp)]
-      (js/Uint8Array. buf))
-    (p/resolved payload)))
-
-(defn- bytes->stream
-  [^js payload chunk-size]
-  (js/ReadableStream.
-   #js {:start (fn [controller]
-                 (loop [offset 0]
-                   (when (< offset (.-byteLength payload))
-                     (.enqueue controller (.slice payload offset (min (+ offset chunk-size)
-                                                                      (.-byteLength payload))))
-                     (recur (+ offset chunk-size))))
-                 (.close controller))}))
-
 (deftest remove-member-request-test
   (async done
          (let [called (atom nil)]
@@ -390,154 +363,51 @@
                (p/finally (fn []
                             (reset! state/*db-worker worker-prev)))))))
 
-(deftest rtc-download-graph-imports-snapshot-once-test
+(deftest rtc-download-graph-delegates-to-worker-download-api-test
   (async done
-         (let [import-calls (atom [])
-               fetch-calls (atom [])
-               datoms [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
-                       {:e 2 :a :block/title :v "hello" :tx 1 :added true}]
-               jsonl-bytes (encode-datoms-jsonl datoms)
-               original-fetch js/fetch
-               download-url "http://base/sync/graph-1/snapshot/download"
-               asset-url "http://base/assets/graph-1/snapshot-1.snapshot"]
-           (-> (p/let [gzip-bytes (<gzip-bytes jsonl-bytes)]
-                 (set! js/fetch
-                       (fn [url opts]
-                         (let [method (or (aget opts "method") "GET")]
-                           (swap! fetch-calls conj [url method])
-                           (cond
-                             (and (= url asset-url) (= method "GET"))
-                             (js/Promise.resolve
-                              #js {:ok true
-                                   :status 200
-                                   :headers #js {:get (fn [header]
-                                                        (when (= header "content-length")
-                                                          (str (.-byteLength gzip-bytes))))}
-                                   :arrayBuffer (fn [] (js/Promise.resolve (.-buffer gzip-bytes)))})
-
-                             :else
-                             (js/Promise.resolve
-                              #js {:ok true
-                                   :status 200})))))
-                 (-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
-                                     db-sync/fetch-json (fn [url _opts _schema]
-                                                          (cond
-                                                            (string/ends-with? url "/pull")
-                                                            (p/resolved {:t 42})
-
-                                                            (= url download-url)
-                                                            (p/resolved {:ok true
-                                                                         :url asset-url
-                                                                         :key "graph-1/snapshot-1.snapshot"
-                                                                         :content-encoding "gzip"})
-
-                                                            :else
-                                                            (p/rejected (ex-info "unexpected fetch-json URL"
-                                                                                 {:url url}))))
-                                     user-handler/task--ensure-id&access-token (fn [resolve _reject]
-                                                                                 (resolve true))
-                                     state/<invoke-db-worker (fn [& args]
-                                                               (swap! import-calls conj args)
-                                                               (if (= :thread-api/db-sync-import-prepare (first args))
-                                                                 (p/resolved {:import-id "import-1"})
-                                                                 (p/resolved :ok)))
-                                     state/set-state! (fn [& _] nil)
-                                     state/pub-event! (fn [& _] nil)]
-                       (db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
-                     (p/finally (fn [] (set! js/fetch original-fetch)))))
+         (let [worker-calls (atom [])]
+           (-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
+                               user-handler/task--ensure-id&access-token (fn [resolve _reject]
+                                                                           (resolve true))
+                               state/<invoke-db-worker (fn [& args]
+                                                         (swap! worker-calls conj args)
+                                                         (p/resolved :ok))
+                               state/set-state! (fn [& _] nil)
+                               state/pub-event! (fn [& _] nil)]
+                 (db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
                (p/then (fn [_]
-                         (is (= 3 (count @import-calls)))
-                         (let [[prepare-op graph reset? graph-uuid graph-e2ee?] (first @import-calls)
-                               [chunk-op imported-datoms chunk-graph-uuid import-id] (second @import-calls)
-                               [finalize-op finalize-graph finalize-graph-uuid remote-tx finalize-import-id] (nth @import-calls 2)]
-                           (is (= :thread-api/db-sync-import-prepare prepare-op))
+                         (is (= 1 (count @worker-calls)))
+                         (let [[op graph graph-uuid graph-e2ee?] (first @worker-calls)]
+                           (is (= :thread-api/db-sync-download-graph op))
                            (is (string/ends-with? graph "demo-graph"))
-                           (is (= true reset?))
                            (is (= "graph-1" graph-uuid))
-                           (is (= false graph-e2ee?))
-                           (is (= :thread-api/db-sync-import-datoms-chunk chunk-op))
-                           (is (= datoms imported-datoms))
-                           (is (= "graph-1" chunk-graph-uuid))
-                           (is (= "import-1" import-id))
-                           (is (= :thread-api/db-sync-import-finalize finalize-op))
-                           (is (string/ends-with? finalize-graph "demo-graph"))
-                           (is (= "graph-1" finalize-graph-uuid))
-                           (is (= 42 remote-tx))
-                           (is (= "import-1" finalize-import-id)))
-                         (is (= [[asset-url "GET"]]
-                                @fetch-calls))
+                           (is (= false graph-e2ee?)))
                          (done)))
                (p/catch (fn [error]
-                          (set! js/fetch original-fetch)
                           (is false (str error))
                           (done)))))))
 
-(deftest rtc-download-graph-streams-gzip-snapshot-test
+(deftest rtc-download-graph-sets-and-clears-downloading-state-test
   (async done
-         (let [import-calls (atom [])
-               datoms [{:e 1 :a :db/ident :v :logseq.class/Page :tx 1 :added true}
-                       {:e 2 :a :block/title :v "hello" :tx 1 :added true}]
-               jsonl-bytes (encode-datoms-jsonl datoms)
-               original-fetch js/fetch
-               download-url "http://base/sync/graph-1/snapshot/download"
-               asset-url "http://base/assets/graph-1/snapshot-1.snapshot"
+         (let [state-calls (atom [])
                worker-prev @state/*db-worker]
            (reset! state/*db-worker nil)
-           (-> (p/let [gzip-bytes (<gzip-bytes jsonl-bytes)
-                       stream (bytes->stream gzip-bytes 3)]
-                 (set! js/fetch
-                       (fn [url opts]
-                         (let [method (or (aget opts "method") "GET")]
-                           (cond
-                             (and (= url asset-url) (= method "GET"))
-                             (js/Promise.resolve
-                              #js {:ok true
-                                   :status 200
-                                   :headers #js {:get (fn [header]
-                                                        (case header
-                                                          "content-length" (str (.-byteLength gzip-bytes))
-                                                          "content-encoding" "gzip"
-                                                          nil))}
-                                   :body stream
-                                   :arrayBuffer (fn [] (throw (js/Error. "arrayBuffer should not be used")))})
-                             :else
-                             (js/Promise.resolve #js {:ok false :status 404})))))
-                 (-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
-                                     db-sync/fetch-json (fn [url _opts _schema]
-                                                          (cond
-                                                            (string/ends-with? url "/pull")
-                                                            (p/resolved {:t 42})
-
-                                                            (= url download-url)
-                                                            (p/resolved {:ok true
-                                                                         :url asset-url
-                                                                         :key "graph-1/snapshot-1.snapshot"
-                                                                         :content-encoding "gzip"})
-
-                                                            :else
-                                                            (p/rejected (ex-info "unexpected fetch-json URL"
-                                                                                 {:url url}))))
-                                     user-handler/task--ensure-id&access-token (fn [resolve _reject]
-                                                                                 (resolve true))
-                                     state/<invoke-db-worker (fn [& args]
-                                                               (swap! import-calls conj args)
-                                                               (if (= :thread-api/db-sync-import-prepare (first args))
-                                                                 (p/resolved {:import-id "import-1"})
-                                                                 (p/resolved :ok)))
-                                     state/set-state! (fn [& _] nil)
-                                     state/pub-event! (fn [& _] nil)]
-                       (db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
-                     (p/finally (fn [] (set! js/fetch original-fetch)))))
+           (-> (p/with-redefs [db-sync/http-base (fn [] "http://base")
+                               user-handler/task--ensure-id&access-token (fn [resolve _reject]
+                                                                           (resolve true))
+                               state/<invoke-db-worker (fn [& _] (p/resolved :ok))
+                               state/pub-event! (fn [& _] nil)
+                               state/set-state! (fn [k v]
+                                                  (swap! state-calls conj [k v])
+                                                  nil)]
+                 (db-sync/<rtc-download-graph! "demo-graph" "graph-1" false))
                (p/then (fn [_]
-                         (is (= 3 (count @import-calls)))
-                         (let [[chunk-op imported-datoms _ import-id] (second @import-calls)]
-                           (is (= :thread-api/db-sync-import-datoms-chunk chunk-op))
-                           (is (= datoms imported-datoms))
-                           (is (= "import-1" import-id)))
+                         (is (= [[:rtc/downloading-graph-uuid "graph-1"]
+                                 [:rtc/downloading-graph-uuid nil]]
+                                @state-calls))
                          (done)))
                (p/catch (fn [error]
                           (reset! state/*db-worker worker-prev)
-                          (set! js/fetch original-fetch)
                           (is false (str error))
                           (done)))
                (p/finally (fn []

+ 69 - 0
src/test/frontend/worker/db_worker_test.cljs

@@ -333,6 +333,75 @@
                                  (is false (str error))
                                  (done)))))))))))
 
+(deftest db-sync-import-rows-chunk-calls-import-rows-batch-test
+  (async done
+         (restoring-worker-state
+          (fn []
+            (let [prepare (@thread-api/*thread-apis :thread-api/db-sync-import-prepare)
+                  rows-chunk (@thread-api/*thread-apis :thread-api/db-sync-import-rows-chunk)
+                  conn (d/create-conn db-schema/schema)
+                  rows [[1 "row-1" nil]
+                        [2 "row-2" nil]]
+                  captured-rows (atom nil)]
+              (with-fake-create-or-open-db
+                test-repo conn
+                (fn []
+                  (-> (p/with-redefs [db-worker/close-db! (fn [_] nil)
+                                      db-worker/<invalidate-search-db! (fn [_] (p/resolved nil))
+                                      rtc-log-and-state/rtc-log (fn [& _] nil)
+                                      db-worker/<ensure-import-rows-db! (fn [state]
+                                                                          (p/resolved state))
+                                      db-worker/import-rows-batch! (fn [_state rows*]
+                                                                     (reset! captured-rows rows*)
+                                                                     (p/resolved 2))]
+                        (p/let [{:keys [import-id]} (prepare test-repo true "graph-1" false)
+                                _ (rows-chunk rows "graph-1" import-id)]
+                          (is (= rows @captured-rows))
+                          (done)))
+                      (p/catch (fn [error]
+                                 (is false (str error))
+                                 (done)))))))))))
+
+(deftest snapshot-datoms-in-import-order-puts-schema-before-data-test
+  (let [conn (d/create-conn db-schema/schema)]
+    (d/transact! conn [{:db/ident :logseq.kv/schema-version
+                        :kv/value {:major 65 :minor 0}}
+                       {:db/ident :user.test/attr
+                        :db/valueType :db.type/string
+                        :db/cardinality :db.cardinality/one}
+                       {:db/id 100
+                        :user.test/attr "hello"}])
+    (let [ordered (vec (#'db-worker/snapshot-datoms-in-import-order conn))
+          data-idx (first (keep-indexed (fn [idx datom]
+                                          (when (and (= 100 (:e datom))
+                                                     (= :user.test/attr (:a datom)))
+                                            idx))
+                                        ordered))
+          attr-eid (:db/id (d/entity @conn :user.test/attr))
+          ident-idx (first (keep-indexed (fn [idx datom]
+                                           (when (and (= attr-eid (:e datom))
+                                                      (= :db/ident (:a datom)))
+                                             idx))
+                                         ordered))
+          cardinality-idx (first (keep-indexed (fn [idx datom]
+                                                 (when (and (= attr-eid (:e datom))
+                                                            (= :db/cardinality (:a datom)))
+                                                   idx))
+                                               ordered))
+          schema-version-eid (:db/id (d/entity @conn :logseq.kv/schema-version))
+          schema-version-idx (first (keep-indexed (fn [idx datom]
+                                                    (when (and (= schema-version-eid (:e datom))
+                                                               (= :db/ident (:a datom)))
+                                                      idx))
+                                                  ordered))]
+      (is (number? data-idx))
+      (is (number? ident-idx))
+      (is (number? cardinality-idx))
+      (is (number? schema-version-idx))
+      (is (< schema-version-idx data-idx))
+      (is (< ident-idx data-idx))
+      (is (< cardinality-idx data-idx)))))
+
 (deftest thread-api-validate-db-passes-sync-diagnostics-test
   (restoring-worker-state
    (fn []