Tienson Qin 1 month ago
parent
commit
4b572a4fed

+ 88 - 3
deps/worker-sync/src/logseq/worker_sync/worker.cljs

@@ -15,14 +15,12 @@
 
 (glogi-console/install!)
 
+(declare handle-assets)
 (defn- handle-worker-fetch [request ^js env]
   (let [url (js/URL. (.-url request))
         path (.-pathname url)
         method (.-method request)]
     (cond
-      (= method "OPTIONS")
-      (common/options-response)
-
       (= path "/health")
       (common/json-response {:ok true})
 
@@ -33,6 +31,12 @@
             stub (.get namespace do-id)]
         (.fetch stub request))
 
+      (string/starts-with? path "/assets/")
+      (handle-assets request env)
+
+      (= method "OPTIONS")
+      (common/options-response)
+
       (string/starts-with? path "/sync/")
       (let [prefix (count "/sync/")
             rest-path (subs path prefix)
@@ -83,6 +87,87 @@
       (when-not (js/isNaN n)
         n))))
 
+(def ^:private max-asset-size (* 100 1024 1024))
+
+(defn- asset-cors-headers []
+  #js {"Access-Control-Allow-Origin" "*"
+       "Access-Control-Allow-Headers" "content-type,x-amz-meta-checksum,x-amz-meta-type"
+       "Access-Control-Allow-Methods" "GET, PUT, DELETE, OPTIONS"})
+
+(defn- parse-asset-path [path]
+  (let [prefix "/assets/"]
+    (when (string/starts-with? path prefix)
+      (let [rest-path (subs path (count prefix))
+            slash-idx (string/index-of rest-path "/")
+            graph-id (when (and slash-idx (pos? slash-idx)) (subs rest-path 0 slash-idx))
+            file (when (and slash-idx (pos? slash-idx)) (subs rest-path (inc slash-idx)))
+            dot-idx (when file (string/last-index-of file "."))
+            asset-uuid (when (and dot-idx (pos? dot-idx)) (subs file 0 dot-idx))
+            asset-type (when (and dot-idx (pos? dot-idx)) (subs file (inc dot-idx)))]
+        (when (and (seq graph-id) (seq asset-uuid) (seq asset-type))
+          {:graph-id graph-id
+           :asset-uuid asset-uuid
+           :asset-type asset-type
+           :key (str graph-id "/" asset-uuid "." asset-type)})))))
+
+(defn- handle-assets [request ^js env]
+  (let [url (js/URL. (.-url request))
+        path (.-pathname url)
+        method (.-method request)]
+    (cond
+      (= method "OPTIONS")
+      (js/Response. nil #js {:status 204 :headers (asset-cors-headers)})
+
+      :else
+      (if-let [{:keys [key asset-type]} (parse-asset-path path)]
+        (let [^js bucket (.-LOGSEQ_SYNC_ASSETS env)]
+          (if-not bucket
+            (js/Response. (js/JSON.stringify #js {:error "missing assets bucket"})
+                          #js {:status 500 :headers (asset-cors-headers)})
+            (case method
+              "GET"
+              (.then (.get bucket key)
+                     (fn [^js obj]
+                       (if (nil? obj)
+                         (js/Response. (js/JSON.stringify #js {:error "not found"})
+                                       #js {:status 404 :headers (asset-cors-headers)})
+                         (let [content-type (or (.-contentType (.-httpMetadata obj))
+                                                "application/octet-stream")]
+                           (js/Response. (.-body obj)
+                                         #js {:status 200
+                                              :headers (js/Object.assign
+                                                        #js {"content-type" content-type
+                                                             "x-asset-type" asset-type}
+                                                        (asset-cors-headers))})))))
+
+              "PUT"
+              (.then (.arrayBuffer request)
+                     (fn [buf]
+                       (if (> (.-byteLength buf) max-asset-size)
+                         (js/Response. (js/JSON.stringify #js {:error "asset too large"})
+                                       #js {:status 413 :headers (asset-cors-headers)})
+                         (.then (.put bucket
+                                      key
+                                      buf
+                                      #js {:httpMetadata #js {:contentType (or (.get (.-headers request) "content-type")
+                                                                               "application/octet-stream")}
+                                           :customMetadata #js {:checksum (.get (.-headers request) "x-amz-meta-checksum")
+                                                                :type asset-type}})
+                                (fn [_]
+                                  (js/Response. (js/JSON.stringify #js {:ok true})
+                                                #js {:status 200 :headers (asset-cors-headers)}))))))
+
+              "DELETE"
+              (.then (.delete bucket key)
+                     (fn [_]
+                       (js/Response. (js/JSON.stringify #js {:ok true})
+                                     #js {:status 200 :headers (asset-cors-headers)})))
+
+              (js/Response. (js/JSON.stringify #js {:error "method not allowed"})
+                            #js {:status 405 :headers (asset-cors-headers)}))))
+        (js/Response. (js/JSON.stringify #js {:error "invalid asset path"})
+                      #js {:status 400 :headers (asset-cors-headers)})))))
+
 (defn- pull-response [^js self since]
   (let [sql (.-sql self)
         txs (storage/fetch-tx-since sql since)]

+ 4 - 0
deps/worker-sync/worker/wrangler.toml

@@ -10,6 +10,10 @@ enabled = true
 name = "LOGSEQ_SYNC_DO"
 class_name = "SyncDO"
 
+[[r2_buckets]]
+binding = "LOGSEQ_SYNC_ASSETS"
+bucket_name = "logseq-sync-assets"
+
 [[durable_objects.bindings]]
 name = "LOGSEQ_SYNC_INDEX_DO"
 class_name = "SyncIndexDO"

+ 2 - 1
src/main/frontend/persist_db/browser.cljs

@@ -147,7 +147,8 @@
        (-> (p/let [_ (state/<invoke-db-worker :thread-api/init config/RTC-WS-URL)
                    _ (state/<invoke-db-worker :thread-api/set-worker-sync-config
                                               {:enabled? config/worker-sync-enabled?
-                                               :ws-url config/worker-sync-ws-url})
+                                               :ws-url config/worker-sync-ws-url
+                                               :http-base config/worker-sync-http-base})
                    _ (sync-app-state!)
                    _ (log/info "init worker spent" (str (- (util/time-ms) t1) "ms"))
                    _ (sync-ui-state!)

+ 269 - 7
src/main/frontend/worker/worker_sync.cljs

@@ -2,8 +2,10 @@
   "Simple worker-sync client based on promesa + WebSocket."
   (:require [clojure.string :as string]
             [datascript.core :as d]
+            [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.state :as worker-state]
             [lambdaisland.glogi :as log]
+            [logseq.common.path :as path]
             [logseq.common.util :as common-util]
             [logseq.db :as ldb]
             [logseq.db.common.normalize :as db-normalize]
@@ -18,6 +20,22 @@
   []
   (:ws-url @worker-state/*worker-sync-config))
 
+(defn- http-base-url
+  []
+  (or (:http-base @worker-state/*worker-sync-config)
+      (when-let [ws-url (ws-base-url)]
+        (let [base (cond
+                     (string/starts-with? ws-url "wss://")
+                     (str "https://" (subs ws-url (count "wss://")))
+
+                     (string/starts-with? ws-url "ws://")
+                     (str "http://" (subs ws-url (count "ws://")))
+
+                     :else ws-url)]
+          (string/replace base #"/sync/%s$" "")))))
+
+(def ^:private max-asset-size (* 100 1024 1024))
+
 (defn- format-ws-url [base graph-id]
   (cond
     (string/includes? base "%s")
@@ -45,6 +63,10 @@
           (some-> (or local-uuid new-local) str)
           (when (string? repo) repo)))))
 
+(defn- ensure-client-graph-uuid! [repo graph-id]
+  (when (seq graph-id)
+    (client-op/update-graph-uuid repo graph-id)))
+
 (defn- ready-state [ws]
   (.-readyState ws))
 
@@ -73,11 +95,31 @@
   (when (number? t)
     (reset! (:server-t client) t)))
 
-(defn- apply-remote-tx! [repo tx-data]
+(def ^:private asset-update-attrs
+  #{:logseq.property.asset/remote-metadata
+    :logseq.property.asset/type
+    :logseq.property.asset/checksum})
+
+(defn- asset-uuids-from-tx [db tx-data]
+  (->> tx-data
+       (keep (fn [datom]
+               (when (and (:added datom)
+                          (contains? asset-update-attrs (:a datom)))
+                 (when-let [ent (d/entity db (:e datom))]
+                   (some-> (:block/uuid ent) str)))))
+       (distinct)))
+
+(declare enqueue-asset-downloads!)
+
+(defn- apply-remote-tx! [repo client tx-data]
   (when-let [conn (worker-state/get-datascript-conn repo)]
     (try
-      (let [tx-data' (db-normalize/de-normalize-tx-data @conn tx-data)]
-        (d/transact! conn tx-data' {:worker-sync/remote? true}))
+      (let [tx-data' (db-normalize/de-normalize-tx-data @conn tx-data)
+            tx-report (d/transact! conn tx-data' {:worker-sync/remote? true})
+            db-after (:db-after tx-report)
+            asset-uuids (asset-uuids-from-tx db-after (:tx-data tx-report))]
+        (when (seq asset-uuids)
+          (enqueue-asset-downloads! repo client asset-uuids)))
       (catch :default e
         (log/error :worker-sync/apply-remote-tx-failed {:error e})))))
 
@@ -103,12 +145,17 @@
 
 (declare flush-pending!)
 (declare remove-pending-txs!)
+
+(declare enqueue-asset-sync!)
+(declare enqueue-asset-initial-download!)
 (defn- handle-message! [repo client raw]
   (when-let [message (parse-message raw)]
     (case (:type message)
       "hello" (do
                 (update-server-t! client (:t message))
                 (send! (:ws client) {:type "pull" :since @(:server-t client)})
+                (enqueue-asset-sync! repo client)
+                (enqueue-asset-initial-download! repo client)
                 (flush-pending! repo client))
       "tx/ok" (do
                 (update-server-t! client (:t message))
@@ -144,7 +191,7 @@
                   (update-server-t! client (:t message))
                   (doseq [{:keys [tx]} (:txs message)]
                     (when tx
-                      (apply-remote-tx! repo (sqlite-util/read-transit-str tx)))))
+                      (apply-remote-tx! repo client (sqlite-util/read-transit-str tx)))))
       "snapshot/ok" (update-server-t! client (:t message))
       nil)))
 
@@ -153,10 +200,219 @@
       (let [client {:repo repo
                     :server-t (atom 0)
                     :send-queue (atom (p/resolved nil))
+                    :asset-queue (atom (p/resolved nil))
                     :inflight (atom [])}]
         (swap! worker-state/*worker-sync-clients assoc repo client)
         client)))
 
+(defn- asset-url [base graph-id asset-uuid asset-type]
+  (str base "/assets/" graph-id "/" asset-uuid "." asset-type))
+
+(defn- enqueue-asset-task! [client task]
+  (when-let [queue (:asset-queue client)]
+    (swap! queue
+           (fn [prev]
+             (p/then prev (fn [_] (task)))))))
+
+(defn- asset-type-from-files
+  [repo asset-uuid]
+  (p/let [paths (worker-state/<invoke-main-thread :thread-api/get-all-asset-file-paths repo)]
+    (some (fn [path]
+            (let [stem (path/file-stem path)
+                  ext (path/file-ext path)]
+              (when (and (seq stem) (seq ext) (= stem (str asset-uuid)))
+                ext)))
+          paths)))
+
+(defn- delete-remote-asset!
+  [repo graph-id asset-uuid asset-type]
+  (let [base (http-base-url)]
+    (if (and (seq base) (seq graph-id) (seq asset-type))
+      (p/let [url (asset-url base graph-id (str asset-uuid) asset-type)
+              resp (js/fetch url #js {:method "DELETE"})]
+        (when-not (.-ok resp)
+          (log/error :worker-sync/asset-delete-failed {:repo repo
+                                                       :asset-uuid asset-uuid
+                                                       :status (.-status resp)})))
+      (log/info :worker-sync/asset-delete-skipped {:repo repo
+                                                   :asset-uuid asset-uuid
+                                                   :reason :missing-base-or-type}))))
+
+(defn- upload-remote-asset!
+  [repo graph-id asset-uuid asset-type checksum]
+  (let [base (http-base-url)]
+    (if (and (seq base) (seq graph-id) (seq asset-type) (seq checksum))
+      (worker-state/<invoke-main-thread :thread-api/rtc-upload-asset
+                                        repo nil (str asset-uuid) asset-type checksum
+                                        (asset-url base graph-id (str asset-uuid) asset-type))
+      (p/rejected (ex-info "missing asset upload info"
+                           {:repo repo
+                            :asset-uuid asset-uuid
+                            :asset-type asset-type
+                            :checksum checksum
+                            :base base
+                            :graph-id graph-id})))))
+
+(defn- download-remote-asset!
+  [repo graph-id asset-uuid asset-type]
+  (let [base (http-base-url)]
+    (if (and (seq base) (seq graph-id) (seq asset-type))
+      (worker-state/<invoke-main-thread :thread-api/rtc-download-asset
+                                        repo nil (str asset-uuid) asset-type
+                                        (asset-url base graph-id (str asset-uuid) asset-type))
+      (p/rejected (ex-info "missing asset download info"
+                           {:repo repo
+                            :asset-uuid asset-uuid
+                            :asset-type asset-type
+                            :base base
+                            :graph-id graph-id})))))
+
+(defn- process-asset-op!
+  [repo graph-id asset-op]
+  (let [asset-uuid (:block/uuid asset-op)]
+    (cond
+      (contains? asset-op :update-asset)
+      (if-let [conn (worker-state/get-datascript-conn repo)]
+        (let [ent (d/entity @conn [:block/uuid asset-uuid])
+              asset-type (:logseq.property.asset/type ent)
+              checksum (:logseq.property.asset/checksum ent)
+              size (:logseq.property.asset/size ent 0)]
+          (cond
+            (or (nil? ent) (nil? asset-type) (nil? checksum))
+            (do
+              (client-op/remove-asset-op repo asset-uuid)
+              (p/resolved nil))
+
+            (> size max-asset-size)
+            (do
+              (log/info :worker-sync/asset-too-large {:repo repo
+                                                      :asset-uuid asset-uuid
+                                                      :size size})
+              (client-op/remove-asset-op repo asset-uuid)
+              (p/resolved nil))
+
+            :else
+            (-> (upload-remote-asset! repo graph-id asset-uuid asset-type checksum)
+                (p/then (fn [_]
+                          (when (d/entity @conn [:block/uuid asset-uuid])
+                            (ldb/transact!
+                             conn
+                             [{:block/uuid asset-uuid
+                               :logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
+                             {:persist-op? false}))
+                          (client-op/remove-asset-op repo asset-uuid)))
+                (p/catch (fn [e]
+                           (case (:type (ex-data e))
+                             :rtc.exception/read-asset-failed
+                             (client-op/remove-asset-op repo asset-uuid)
+
+                             :rtc.exception/upload-asset-failed
+                             nil
+
+                             (log/error :worker-sync/asset-upload-failed
+                                        {:repo repo
+                                         :asset-uuid asset-uuid
+                                         :error e})))))))
+        (p/resolved nil))
+
+      (contains? asset-op :remove-asset)
+      (-> (p/let [conn (worker-state/get-datascript-conn repo)
+                  ent (when conn (d/entity @conn [:block/uuid asset-uuid]))
+                  asset-type (if (seq (:logseq.property.asset/type ent))
+                               (:logseq.property.asset/type ent)
+                               (asset-type-from-files repo asset-uuid))]
+            (p/do!
+             (when (seq asset-type)
+               (delete-remote-asset! repo graph-id asset-uuid asset-type))
+             (client-op/remove-asset-op repo asset-uuid)))
+          (p/catch (fn [e]
+                     (log/error :worker-sync/asset-delete-failed
+                                {:repo repo
+                                 :asset-uuid asset-uuid
+                                 :error e}))))
+
+      :else
+      (p/resolved nil))))
+
+(defn- process-asset-ops!
+  [repo client]
+  (let [graph-id (:graph-id client)
+        asset-ops (not-empty (client-op/get-all-asset-ops repo))]
+    (if (and (seq graph-id) asset-ops)
+      (p/loop [ops asset-ops]
+        (if (empty? ops)
+          nil
+          (p/do!
+           (process-asset-op! repo graph-id (first ops))
+           (p/recur (rest ops)))))
+      (p/resolved nil))))
+
+(defn- enqueue-asset-sync! [repo client]
+  (enqueue-asset-task! client #(process-asset-ops! repo client)))
+
+(defn- enqueue-asset-downloads!
+  [repo client asset-uuids]
+  (when (seq asset-uuids)
+    (enqueue-asset-task! client
+                         (fn []
+                           (let [graph-id (:graph-id client)]
+                             (if (seq graph-id)
+                               (p/loop [uuids (distinct asset-uuids)]
+                                 (if (empty? uuids)
+                                   nil
+                                   (let [asset-uuid (first uuids)
+                                         conn (worker-state/get-datascript-conn repo)
+                                         ent (when conn (d/entity @conn [:block/uuid asset-uuid]))
+                                         asset-type (:logseq.property.asset/type ent)]
+                                     (p/do!
+                                      (when (seq asset-type)
+                                        (p/let [meta (worker-state/<invoke-main-thread
+                                                      :thread-api/get-asset-file-metadata
+                                                      repo (str asset-uuid) asset-type)]
+                                          (when (nil? meta)
+                                            (download-remote-asset! repo graph-id asset-uuid asset-type))))
+                                      (p/recur (rest uuids))))))
+                               (p/resolved nil)))))))
+
+(defn- enqueue-asset-initial-download!
+  [repo client]
+  (enqueue-asset-task! client
+                       (fn []
+                         (if-let [conn (worker-state/get-datascript-conn repo)]
+                           (let [db @conn
+                                 graph-id (:graph-id client)
+                                 remote-assets (d/q '[:find ?uuid ?type
+                                                      :where
+                                                      [?e :block/uuid ?uuid]
+                                                      [?e :logseq.property.asset/type ?type]
+                                                      [?e :logseq.property.asset/remote-metadata]]
+                                                    db)]
+                             (if (seq graph-id)
+                               (-> (p/let [paths (worker-state/<invoke-main-thread
+                                                  :thread-api/get-all-asset-file-paths
+                                                  repo)]
+                                     (let [local-uuids (into #{}
+                                                             (keep (fn [path]
+                                                                     (let [stem (path/file-stem path)]
+                                                                       (when (seq stem)
+                                                                         stem))))
+                                                             paths)
+                                           missing (remove (fn [[uuid _type]]
+                                                             (contains? local-uuids (str uuid)))
+                                                           remote-assets)]
+                                       (p/loop [entries missing]
+                                         (if (empty? entries)
+                                           nil
+                                           (let [[asset-uuid asset-type] (first entries)]
+                                             (p/do!
+                                              (download-remote-asset! repo graph-id asset-uuid asset-type)
+                                              (p/recur (rest entries))))))))
+                                   (p/catch (fn [e]
+                                              (log/error :worker-sync/asset-initial-download-failed
+                                                         {:repo repo :error e}))))
+                               (p/resolved nil)))
+                           (p/resolved nil)))))
+
 (defn- client-ops-conn [repo]
   (worker-state/get-client-ops-conn repo))
 
@@ -233,7 +489,9 @@
     (attach-ws-handlers! repo updated ws)
     (set! (.-onopen ws)
           (fn [_]
-            (send! ws {:type "hello" :client repo})))
+            (send! ws {:type "hello" :client repo})
+            (enqueue-asset-sync! repo updated)
+            (enqueue-asset-initial-download! repo updated)))
     (start-pull-loop! updated ws)))
 
 (defn start!
@@ -245,7 +503,9 @@
       (if (and (string? base) (seq base) (seq graph-id))
         (let [client (ensure-client-state! repo)
               url (format-ws-url base graph-id)
-              connected (connect! repo client url)]
+              _ (ensure-client-graph-uuid! repo graph-id)
+              connected (assoc client :graph-id graph-id)
+              connected (connect! repo connected url)]
           (swap! worker-state/*worker-sync-clients assoc repo connected)
           (p/resolved nil))
         (do
@@ -290,4 +550,6 @@
              (not (:worker-sync/remote? tx-meta))
              (not (:rtc-download-graph? tx-meta))
              (not (:from-disk? tx-meta)))
-    (enqueue-local-tx! repo tx-report)))
+    (enqueue-local-tx! repo tx-report)
+    (when-let [client (get @worker-state/*worker-sync-clients repo)]
+      (enqueue-asset-sync! repo client))))