Преглед изворни кода

fix: both upload and download graph

tested for 4k movies
Tienson Qin пре 1 месец
родитељ
комит
274e7b63ef

+ 8 - 11
deps/worker-sync/src/logseq/worker_sync/worker.cljs

@@ -204,16 +204,13 @@
       (storage/init-schema! sql)
       (storage/set-t! sql 0))
     (when (and rows (pos? (.-length rows)))
-      (doseq [row (array-seq rows)]
-        (let [addr (aget row "addr")
-              content (aget row "content")
-              addresses (aget row "addresses")]
-          (common/sql-exec sql
-                           (str "insert into kvs (addr, content, addresses) values (?, ?, ?)"
-                                " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
-                           addr
-                           content
-                           addresses))))
+      (doseq [[addr content addresses] (array-seq rows)]
+        (common/sql-exec sql
+                         (str "insert into kvs (addr, content, addresses) values (?, ?, ?)"
+                              " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
+                         addr
+                         content
+                         addresses)))
     (set! (.-conn self) (storage/open-conn sql))
     (let [conn (.-conn self)]
       (prn :debug :import-datoms-count (count (d/datoms @conn :eavt))))))
@@ -369,7 +366,7 @@
             (common/json-response (snapshot-response self))
 
             (and (= method "GET") (= path "/snapshot/rows"))
-            (let [after (or (parse-int (.get (.-searchParams url) "after")) 0)
+            (let [after (or (parse-int (.get (.-searchParams url) "after")) -1)
                   limit (or (parse-int (.get (.-searchParams url) "limit"))
                             snapshot-rows-default-limit)
                   limit (-> limit

+ 1 - 1
src/main/frontend/handler/db_based/sync.cljs

@@ -23,7 +23,7 @@
 
 (defn <rtc-download-graph! [graph-name graph-uuid graph-schema-version timeout-ms]
   (if (worker-sync-enabled?)
-    (worker-sync-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version timeout-ms)
+    (worker-sync-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version)
     (rtc-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version timeout-ms)))
 
 (defn <rtc-stop! []

+ 8 - 8
src/main/frontend/handler/db_based/worker_sync.cljs

@@ -90,12 +90,12 @@
                             :base base})))))
 
 (defn <rtc-download-graph!
-  [graph-name graph-uuid _graph-schema-version timeout-ms]
+  [graph-name graph-uuid _graph-schema-version]
   (state/set-state! :rtc/downloading-graph-uuid graph-uuid)
   (let [base (http-base)]
     (-> (if (and graph-uuid base)
           (p/let [graph (str config/db-version-prefix graph-name)]
-            (p/loop [after 0
+            (p/loop [after -1           ; root addr is 0
                      first-batch? true]
               (p/let [resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/rows"
                                             "?after=" after "&limit=" snapshot-rows-limit)
@@ -103,18 +103,18 @@
                       rows (js->clj (aget resp "rows") :keywordize-keys true)
                       done? (true? (aget resp "done"))
                       last-addr (or (aget resp "last_addr") after)]
-                (state/<invoke-db-worker :thread-api/worker-sync-import-kvs-rows
-                                         graph rows first-batch?)
-                (if done?
-                  (state/<invoke-db-worker :thread-api/worker-sync-finalize-kvs-import graph)
-                  (p/recur last-addr false)))))
+                (p/do!
+                 (state/<invoke-db-worker :thread-api/worker-sync-import-kvs-rows
+                                          graph rows first-batch?)
+                 (if done?
+                   (state/<invoke-db-worker :thread-api/worker-sync-finalize-kvs-import graph)
+                   (p/recur last-addr false))))))
           (p/rejected (ex-info "worker-sync missing graph info"
                                {:type :worker-sync/invalid-graph
                                 :graph-uuid graph-uuid
                                 :base base})))
         (p/catch (fn [error]
                    (throw error)))
-        (p/timeout timeout-ms)
         (p/finally
           (fn []
             (state/set-state! :rtc/downloading-graph-uuid nil))))))

+ 5 - 6
src/main/frontend/worker/db_worker.cljs

@@ -614,13 +614,12 @@
 
 (def-thread-api :thread-api/worker-sync-finalize-kvs-import
   [repo]
-  (when-let [^js db (get @*worker-sync-import-dbs repo)]
+  (p/let [^js db (get @*worker-sync-import-dbs repo)]
     (.close db)
-    (swap! *worker-sync-import-dbs dissoc repo))
-  (p/do!
-   ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? false})
-   ((@thread-api/*thread-apis :thread-api/export-db) repo)
-   (shared-service/broadcast-to-clients! :add-repo {:repo repo})))
+    (swap! *worker-sync-import-dbs dissoc repo)
+    ((@thread-api/*thread-apis :thread-api/create-or-open-db) repo {:close-other-db? true})
+    ((@thread-api/*thread-apis :thread-api/export-db) repo)
+    (shared-service/broadcast-to-clients! :add-repo {:repo repo})))
 
 (def-thread-api :thread-api/unsafe-unlink-db
   [repo]

+ 2 - 4
src/main/frontend/worker/worker_sync.cljs

@@ -562,7 +562,7 @@
   [db last-addr limit]
   (.exec db #js {:sql "select addr, content, addresses from kvs where addr > ? order by addr asc limit ?"
                  :bind #js [last-addr limit]
-                 :rowMode "object"}))
+                 :rowMode "array"}))
 
 (defn upload-graph!
   [repo]
@@ -585,9 +585,7 @@
                                               #js {:done true})})]
                   (client-op/add-all-exists-asset-as-ops repo)
                   {:graph-id graph-id})
-                (let [max-addr (apply max (map (fn [row] (aget row "addr")) rows))]
-                  (prn :debug :max-addr max-addr
-                       :rows (map (fn [row] (aget row "addr")) rows))
+                (let [max-addr (apply max (map first rows))]
                   (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
                                         {:method "POST"
                                          :headers {"content-type" "application/json"}