Browse Source

fix(rtc): ensure asset synced using push-asset-block-updates messages

rcmerci 4 months ago
parent
commit
bbe8f890ee

+ 39 - 17
src/main/frontend/worker/rtc/asset.cljs

@@ -50,24 +50,37 @@
                                        repo (str block-uuid) asset-type))))
 
 (defn- remote-block-ops=>remote-asset-ops
-  [db-before remove-ops]
-  (keep
-   (fn [remove-op]
-     (let [block-uuid (:block-uuid remove-op)]
-       (when-let [ent (d/entity db-before [:block/uuid block-uuid])]
-         (when-let [asset-type (:logseq.property.asset/type ent)]
-           {:op :remove-asset
-            :block/uuid block-uuid
-            :logseq.property.asset/type asset-type}))))
-   remove-ops))
+  [db-before db-after remove-ops update-ops]
+  (concat
+   (keep
+    (fn [remove-op]
+      (let [block-uuid (:block-uuid remove-op)]
+        (when-let [ent (d/entity db-before [:block/uuid block-uuid])]
+          (when-let [asset-type (:logseq.property.asset/type ent)]
+            {:op :remove-asset
+             :block/uuid block-uuid
+             :logseq.property.asset/type asset-type}))))
+    remove-ops)
+   (keep
+    (fn [update-op]
+      (let [block-uuid (:self update-op)]
+        (when-let [ent (d/entity db-after [:block/uuid block-uuid])]
+          (let [remote-metadata (:logseq.property.asset/remote-metadata ent)
+                checksum (:logseq.property.asset/checksum ent)
+                asset-type (:logseq.property.asset/type ent)]
+            (when (and remote-metadata checksum asset-type)
+              {:op :update-asset
+               :block/uuid block-uuid})))))
+    update-ops)))
 
 (defn emit-remote-asset-updates-from-block-ops
-  [db-before remove-ops]
+  [db-before db-after remove-ops update-ops]
   (when-let [asset-update-ops
-             (not-empty (remote-block-ops=>remote-asset-ops db-before remove-ops))]
+             (not-empty (remote-block-ops=>remote-asset-ops db-before db-after remove-ops update-ops))]
     (reset! *remote-asset-updates asset-update-ops)))
 
 (defn new-task--emit-remote-asset-updates-from-push-asset-upload-updates
+  "deprecated"
   [repo db push-asset-upload-updates-message]
   (m/sp
     (let [{:keys [uploaded-assets]} push-asset-upload-updates-message]
@@ -228,16 +241,25 @@
                   asset-update-ops)
             asset-uuid->asset-type (into {}
                                          (keep (fn [asset-uuid]
-                                                 (when-let [tp (:logseq.property.asset/type
-                                                                (d/entity @conn [:block/uuid asset-uuid]))]
-                                                   [asset-uuid tp])))
+                                                 (when-let [ent (d/entity @conn [:block/uuid asset-uuid])]
+                                                   (let [asset-type (:logseq.property.asset/type ent)]
+                                                     [asset-uuid asset-type]))))
                                          update-asset-uuids)
             asset-uuid->url
-            (when (seq asset-uuid->asset-type)
+            (when-let [asset-uuids
+                       (->> asset-uuid->asset-type
+                            (map
+                             (fn [[asset-uuid asset-type]]
+                               (m/sp
+                                 (when (nil? (m/? (new-task--get-asset-file-metadata repo asset-uuid asset-type)))
+                                   asset-uuid))))
+                            (apply m/join vector)
+                            m/?
+                            (remove nil?))]
               (->> (m/? (ws-util/send&recv get-ws-create-task
                                            {:action "get-assets-download-urls"
                                             :graph-uuid graph-uuid
-                                            :asset-uuids (keys asset-uuid->asset-type)}))
+                                            :asset-uuids asset-uuids}))
                    :asset-uuid->url))]
         (doseq [[asset-uuid asset-type] remove-asset-uuid->asset-type]
           (c.m/<? (worker-state/<invoke-main-thread :thread-api/unlink-asset

+ 11 - 6
src/main/frontend/worker/rtc/core.cljs

@@ -41,7 +41,8 @@
   and filter messages with :req-id=
   - `push-updates`
   - `online-users-updated`.
-  - `push-asset-upload-updates`"
+  - `push-asset-upload-updates`
+  - `push-asset-block-updates`"
   [get-ws-create-task]
   (m/ap
     (loop []
@@ -52,7 +53,8 @@
                                  (contains?
                                   #{"online-users-updated"
                                     "push-updates"
-                                    "push-asset-upload-updates"}
+                                    "push-asset-upload-updates"
+                                    "push-asset-block-updates"}
                                   (:req-id data))))
                        (ws/recv-flow ws)))
                 (catch js/CloseEvent _
@@ -134,7 +136,9 @@
                                     (case (:req-id data)
                                       "push-updates" {:type :remote-update :value data}
                                       "online-users-updated" {:type :online-users-updated :value data}
-                                      "push-asset-upload-updates" {:type :remote-asset-update :value data})))
+                                      ;; TODO: push-asset-upload-updates is deprecated, del it later
+                                      "push-asset-upload-updates" {:type :remote-asset-update :value data}
+                                      "push-asset-block-updates" {:type :remote-asset-block-update :value data})))
                              (get-remote-updates get-ws-create-task))
         local-updates-check-flow (m/eduction
                                   (map (fn [data] {:type :local-update-check :value data}))
@@ -259,15 +263,16 @@
           (->>
            (let [event (m/?> mixed-flow)]
              (case (:type event)
-               :remote-update
+               (:remote-update :remote-asset-block-update)
                (try (r.remote-update/apply-remote-update graph-uuid repo conn date-formatter event add-log-fn)
                     (catch :default e
                       (when (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
                         (m/? (r.client/new-task--pull-remote-data
                               repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn)))))
                :remote-asset-update
-               (m/? (r.asset/new-task--emit-remote-asset-updates-from-push-asset-upload-updates
-                     repo @conn (:value event)))
+               nil
+               ;; (m/? (r.asset/new-task--emit-remote-asset-updates-from-push-asset-upload-updates
+               ;;       repo @conn (:value event)))
 
                :local-update-check
                (m/? (r.client/new-task--push-local-ops

+ 1 - 0
src/main/frontend/worker/rtc/malli_schema.cljs

@@ -196,6 +196,7 @@
        [:t :int]
        [:max-remote-schema-version {:optional true} :string]]]
      ["apply-ops" apply-ops-response-schema]
+     ["push-asset-block-updates" apply-ops-response-schema]
      ["branch-graph"
       [:map
        [:graph-uuid :uuid]

+ 2 - 1
src/main/frontend/worker/rtc/remote_update.cljs

@@ -610,7 +610,8 @@ so need to pull earlier remote-data from websocket."})
           (worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops))
           ;; wait all remote-ops transacted into db,
           ;; then start to check any asset-updates in remote
-          (r.asset/emit-remote-asset-updates-from-block-ops db-before remove-ops)
+          (let [db-after @conn]
+            (r.asset/emit-remote-asset-updates-from-block-ops db-before db-after remove-ops update-ops))
           (js/console.groupEnd)
 
           (client-op/update-local-tx repo remote-t)