Tienson Qin 1 месяц назад
Родитель
Сommit
36488b5a2d

+ 2 - 2
deps/worker-sync/src/logseq/worker_sync/common.cljs

@@ -7,8 +7,8 @@
 
 (defn- cors-headers []
   #js {"access-control-allow-origin" "*"
-       "access-control-allow-headers" "content-type"
-       "access-control-allow-methods" "GET,POST,DELETE,OPTIONS"})
+       "access-control-allow-headers" "content-type,x-amz-meta-checksum,x-amz-meta-type"
+       "access-control-allow-methods" "GET,POST,PUT,DELETE,OPTIONS"})
 
 (defn json-response
   ([data] (json-response data 200))

+ 62 - 50
deps/worker-sync/src/logseq/worker_sync/worker.cljs

@@ -306,56 +306,68 @@
     path))
 
 (defn- handle-http [^js self request]
-  (let [url (js/URL. (.-url request))
-        raw-path (.-pathname url)
-        path (strip-sync-prefix raw-path)
-        method (.-method request)]
-    (cond
-      (= method "OPTIONS")
-      (common/options-response)
-
-      (and (= method "GET") (= path "/health"))
-      (common/json-response {:ok true})
-
-      (and (= method "GET") (= path "/pull"))
-      (let [since (or (parse-int (.get (.-searchParams url) "since")) 0)]
-        (common/json-response (pull-response self since)))
-
-      (and (= method "GET") (= path "/snapshot"))
-      (common/json-response (snapshot-response self))
-
-      (and (= method "DELETE") (= path "/admin/reset"))
-      (do
-        (common/sql-exec (.-sql self) "drop table if exists kvs")
-        (common/sql-exec (.-sql self) "drop table if exists tx_log")
-        (common/sql-exec (.-sql self) "drop table if exists sync_meta")
-        (storage/init-schema! (.-sql self))
-        (common/json-response {:ok true}))
-
-      (and (= method "POST") (= path "/tx"))
-      (.then (common/read-json request)
-             (fn [result]
-               (if (nil? result)
-                 (common/bad-request "missing body")
-                 (let [tx-data (protocol/transit->tx (aget result "tx"))
-                       t-before (parse-int (aget result "t_before"))]
-                   (if (sequential? tx-data)
-                     (common/json-response (handle-tx! self nil tx-data t-before))
-                     (common/bad-request "invalid tx"))))))
-
-      (and (= method "POST") (= path "/tx/batch"))
-      (.then (common/read-json request)
-             (fn [result]
-               (if (nil? result)
-                 (common/bad-request "missing body")
-                 (let [txs (js->clj (aget result "txs"))
-                       t-before (parse-int (aget result "t_before"))]
-                   (if (and (sequential? txs) (every? string? txs))
-                     (common/json-response (handle-tx-batch! self nil txs t-before))
-                     (common/bad-request "invalid tx"))))))
-
-      :else
-      (common/not-found))))
+  (letfn [(with-cors-error [resp]
+            (if (instance? js/Promise resp)
+              (.catch resp
+                      (fn [e]
+                        (log/error :worker-sync/http-error {:error e})
+                        (common/json-response {:error "server error"} 500)))
+              resp))]
+    (try
+      (let [url (js/URL. (.-url request))
+            raw-path (.-pathname url)
+            path (strip-sync-prefix raw-path)
+            method (.-method request)]
+        (with-cors-error
+          (cond
+            (= method "OPTIONS")
+            (common/options-response)
+
+            (and (= method "GET") (= path "/health"))
+            (common/json-response {:ok true})
+
+            (and (= method "GET") (= path "/pull"))
+            (let [since (or (parse-int (.get (.-searchParams url) "since")) 0)]
+              (common/json-response (pull-response self since)))
+
+            (and (= method "GET") (= path "/snapshot"))
+            (common/json-response (snapshot-response self))
+
+            (and (= method "DELETE") (= path "/admin/reset"))
+            (do
+              (common/sql-exec (.-sql self) "drop table if exists kvs")
+              (common/sql-exec (.-sql self) "drop table if exists tx_log")
+              (common/sql-exec (.-sql self) "drop table if exists sync_meta")
+              (storage/init-schema! (.-sql self))
+              (common/json-response {:ok true}))
+
+            (and (= method "POST") (= path "/tx"))
+            (.then (common/read-json request)
+                   (fn [result]
+                     (if (nil? result)
+                       (common/bad-request "missing body")
+                       (let [tx-data (protocol/transit->tx (aget result "tx"))
+                             t-before (parse-int (aget result "t_before"))]
+                         (if (sequential? tx-data)
+                           (common/json-response (handle-tx! self nil tx-data t-before))
+                           (common/bad-request "invalid tx"))))))
+
+            (and (= method "POST") (= path "/tx/batch"))
+            (.then (common/read-json request)
+                   (fn [result]
+                     (if (nil? result)
+                       (common/bad-request "missing body")
+                       (let [txs (js->clj (aget result "txs"))
+                             t-before (parse-int (aget result "t_before"))]
+                         (if (and (sequential? txs) (every? string? txs))
+                           (common/json-response (handle-tx-batch! self nil txs t-before))
+                           (common/bad-request "invalid tx"))))))
+
+            :else
+            (common/not-found))))
+      (catch :default e
+        (log/error :worker-sync/http-error {:error e})
+        (common/json-response {:error "server error"} 500)))))
 
 (defclass SyncDO
   (extends DurableObject)

+ 1 - 2
src/main/frontend/components/repo.cljs

@@ -122,8 +122,7 @@
                                  token (state/get-auth-id-token)
                                  remote-graph-name (config/db-graph-name (state/get-current-repo))]
                              (when (and token remote-graph-name)
-                               (state/<invoke-db-worker :thread-api/rtc-async-upload-graph
-                                                        repo token remote-graph-name)
+                               (rtc-handler/<rtc-upload-graph! repo token remote-graph-name)
                                (when (util/mobile?)
                                  (shui/popup-show! nil
                                                    (fn []

+ 8 - 1
src/main/frontend/handler/db_based/sync.cljs

@@ -2,7 +2,8 @@
   "Dispatch RTC calls between legacy RTC and worker-sync implementations."
   (:require [frontend.config :as config]
             [frontend.handler.db-based.rtc :as rtc-handler]
-            [frontend.handler.db-based.worker-sync :as worker-sync-handler]))
+            [frontend.handler.db-based.worker-sync :as worker-sync-handler]
+            [frontend.state :as state]))
 
 (defn- worker-sync-enabled? []
   config/worker-sync-enabled?)
@@ -22,6 +23,12 @@
     (worker-sync-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version timeout-ms)
     (rtc-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version timeout-ms)))
 
+(defn <rtc-upload-graph! [repo token remote-graph-name]
+  (if (worker-sync-enabled?)
+    (state/<invoke-db-worker :thread-api/worker-sync-upload-graph repo)
+    (state/<invoke-db-worker :thread-api/rtc-async-upload-graph
+                             repo token remote-graph-name)))
+
 (defn <rtc-stop! []
   (if (worker-sync-enabled?)
     (worker-sync-handler/<rtc-stop!)

+ 4 - 0
src/main/frontend/worker/db_worker.cljs

@@ -404,6 +404,10 @@
   []
   (worker-sync/stop!))
 
+(def-thread-api :thread-api/worker-sync-upload-graph
+  [repo]
+  (worker-sync/upload-graph! repo))
+
 (def-thread-api :thread-api/set-infer-worker-proxy
   [infer-worker-proxy]
   (reset! worker-state/*infer-worker infer-worker-proxy)

+ 69 - 0
src/main/frontend/worker/worker_sync.cljs

@@ -35,6 +35,7 @@
           (string/replace base #"/sync/%s$" "")))))
 
 (def ^:private max-asset-size (* 100 1024 1024))
+(def ^:private upload-batch-size 2000)
 
 (defn- format-ws-url [base graph-id]
   (cond
@@ -91,6 +92,18 @@
     (catch :default _
       nil)))
 
+(defn- fetch-json
+  [url opts]
+  (p/let [resp (js/fetch url (clj->js opts))
+          text (.text resp)
+          data (when (seq text) (js/JSON.parse text))]
+    (if (.-ok resp)
+      (js->clj data :keywordize-keys true)
+      (throw (ex-info "worker-sync request failed"
+                      {:status (.-status resp)
+                       :url url
+                       :body data})))))
+
 (defn- update-server-t! [client t]
   (when (number? t)
     (reset! (:server-t client) t)))
@@ -553,3 +566,59 @@
     (enqueue-local-tx! repo tx-report)
     (when-let [client (get @worker-state/*worker-sync-clients repo)]
       (enqueue-asset-sync! repo client))))
+
+(def ^:private upload-max-retries 3)
+
+(defn upload-graph!
+  [repo]
+  (let [base (http-base-url)
+        graph-id (get-graph-id repo)]
+    (if-not (and (seq base) (seq graph-id))
+      (p/rejected (ex-info "worker-sync missing upload info"
+                           {:repo repo :base base :graph-id graph-id}))
+      (if-let [conn (worker-state/get-datascript-conn repo)]
+        (let [db @conn
+              datoms (seq (d/datoms db :eavt))]
+          (ensure-client-graph-uuid! repo graph-id)
+          (p/loop [remaining datoms
+                   t-before 0
+                   retries 0]
+            (if (empty? remaining)
+              (do
+                (client-op/add-all-exists-asset-as-ops repo)
+                {:graph-id graph-id})
+              (let [[chunk rest] (split-at upload-batch-size remaining)
+                    normalized (normalize-tx-data db db chunk)]
+                (if (empty? normalized)
+                  (p/recur (seq rest) t-before 0)
+                  (p/let [tx-str (sqlite-util/write-transit-str normalized)
+                          resp (fetch-json (str base "/sync/" graph-id "/tx/batch")
+                                           {:method "POST"
+                                            :headers {"content-type" "application/json"}
+                                            :body (js/JSON.stringify
+                                                   #js {:t_before t-before
+                                                        :txs #js [tx-str]})})]
+                    (cond
+                      (= "tx/batch/ok" (:type resp))
+                      (p/recur (seq rest) (:t resp) 0)
+
+                      (= "tx/reject" (:type resp))
+                      (if (= "stale" (:reason resp))
+                        (if (< retries upload-max-retries)
+                          (p/recur remaining (:t resp) (inc retries))
+                          (throw (ex-info "worker-sync upload stale limit"
+                                          {:repo repo
+                                           :graph-id graph-id
+                                           :response resp})))
+                        (throw (ex-info "worker-sync upload rejected"
+                                        {:repo repo
+                                         :graph-id graph-id
+                                         :response resp})))
+
+                      :else
+                      (throw (ex-info "worker-sync upload failed"
+                                      {:repo repo
+                                       :graph-id graph-id
+                                       :response resp})))))))))
+        (p/rejected (ex-info "worker-sync missing db"
+                             {:repo repo :graph-id graph-id}))))))