ソースを参照

enhance(rtc): update asset-sync

rcmerci 1 年間 前
コミット
84bc8163ff

+ 2 - 2
src/main/frontend/db_worker.cljs

@@ -424,7 +424,7 @@
    (when-let [conn (state/get-datascript-conn repo)]
      (async/go
        (try
-         (let [state (<! (rtc-core/<init-state repo token))]
+         (let [state (<! (rtc-core/<init-state repo token false))]
            (<! (rtc-updown/<upload-graph state repo conn))
            (rtc-db-listener/listen-db-to-generate-ops repo conn))
          (worker-util/post-message :notification
@@ -443,7 +443,7 @@
   (rtc-download-graph
    [this repo token graph-uuid]
    (async/go
-     (let [state (<! (rtc-core/<init-state repo token))]
+     (let [state (<! (rtc-core/<init-state repo token false))]
        (try
          (<? (rtc-updown/<download-graph state repo graph-uuid))
          (worker-util/post-message :notification

+ 83 - 27
src/main/frontend/worker/rtc/asset_sync.cljs

@@ -6,7 +6,6 @@
             [cljs.core.async :as async :refer [<! >! chan go go-loop]]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
-            [frontend.handler.user :as user]
             [frontend.worker.rtc.ws :as ws]
             [frontend.worker.async-util :include-macros true :refer [<?]]
             [datascript.core :as d]
@@ -17,10 +16,15 @@
    [:*graph-uuid :any]
    [:*repo :any]
    [:*db-conn :any]
+   [:*token :any]
+   [:*date-formatter :any]
+   [:*ws :any]
    [:*assets-update-state :any]
+   [:data-from-ws-chan :any]
    [:data-from-ws-pub :any]
    [:*auto-push-assets-update-ops? :any]
-   [:toggle-auto-push-assets-update-ops-chan :any]])
+   [:toggle-auto-push-assets-update-ops-chan :any]
+   [:*stop-asset-sync-loop-chan :any]])
 
 (def state-validator
   (let [validator (m/validator state-schema)]
@@ -29,6 +33,24 @@
         true
         (prn (mu/explain-data state-schema data))))))
 
+(defonce *asset-sync-state (atom nil))
+
+(defn init-state-from-rtc-state
+  [rtc-state]
+  {:post [(m/validate state-schema %)]}
+  {:*graph-uuid (atom nil)
+   :*repo (atom nil)
+   :*db-conn (atom nil)
+   :*token (:*token rtc-state)
+   :*date-formatter (atom nil)
+   :*ws (:*ws rtc-state)
+   :*assets-update-state (atom nil)
+   :data-from-ws-chan (:data-from-ws-chan rtc-state)
+   :data-from-ws-pub (:data-from-ws-pub rtc-state)
+   :*auto-push-assets-update-ops? (atom true :validator boolean?)
+   :toggle-auto-push-assets-update-ops-chan (chan (async/sliding-buffer 1))
+   :*stop-asset-sync-loop-chan (atom nil)})
+
 
 (defn- <push-data-from-ws-handler
   [repo push-data-from-ws]
@@ -41,33 +63,38 @@
   [state graph-uuid repo conn]
   (go-loop []
     (when-let [{min-epoch-asset-ops :ops asset-uuid :asset-uuid} (op-mem-layer/get-min-epoch-asset-ops repo)]
-      (try
-        (doseq [[tp _op] min-epoch-asset-ops]
-          (case tp
-            :update-asset
-            (let [asset-entity (d/pull @conn '[*] [:asset/uuid asset-uuid])
-                  r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
-                                                 :create [{:asset-uuid asset-uuid
-                                                           :asset-name (or (some-> asset-entity :asset/meta :name)
-                                                                           "default-name")}]}))]
-              (when (:ex-data r)
-                (throw (ex-info (:ex-message r) (:ex-data r)))))
+      (let [recur?
+            (try
+              (doseq [[tp _op] min-epoch-asset-ops]
+                (case tp
+                  :update-asset
+                  (let [asset-entity (d/pull @conn '[*] [:asset/uuid asset-uuid])
+                        r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
+                                                       :create [{:asset-uuid asset-uuid
+                                                                 :asset-name (or (some-> asset-entity :asset/meta :name)
+                                                                                 "default-name")}]}))]
+                    (when (:ex-data r)
+                      (throw (ex-info (:ex-message r) (:ex-data r)))))
 
-            :remove-asset
-            (let [r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
-                                                 :delete [asset-uuid]}))]
-              (when (:ex-data r)
-                (throw (ex-info (:ex-message r) (:ex-data r)))))))
-        (op-mem-layer/remove-asset-ops! repo asset-uuid)
-        (recur)
-        (catch :default e
-          (prn ::unknown-ex e))))))
+                  :remove-asset
+                  (let [r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
+                                                       :delete [asset-uuid]}))]
+                    (when (:ex-data r)
+                      (throw (ex-info (:ex-message r) (:ex-data r)))))))
+              (op-mem-layer/remove-asset-ops! repo asset-uuid)
+              :recur
+              (catch :default e
+                (prn ::unknown-ex e)
+                nil))]
+        (when (= :recur recur?)
+          (recur))))))
 
 
 (defn- <client-op-update-handler
   [state]
   {:pre [(some? @(:*graph-uuid state))
-         (some? @(:*repo state))]}
+         (some? @(:*repo state))
+         (some? @(:*db-conn state))]}
   (go
     (let [repo @(:*repo state)
           conn @(:*db-conn state)
@@ -82,19 +109,21 @@
     (go
       (<! (async/timeout 2000))
       ;; TODO: get-unpushed-assets-update-count
-      (pos? (op-mem-layer/get-unpushed-block-update-count repo)))))
+      (pos? (op-mem-layer/get-unpushed-asset-update-count repo)))))
 
 (defn <loop-for-assets-sync
-  [state graph-uuid repo]
+  [state graph-uuid repo conn]
   {:pre [(state-validator state)]}
   (go
     (reset! (:*repo state) repo)
     (reset! (:*graph-uuid state) graph-uuid)
+    (reset! (:*db-conn state) conn)
     (let [{:keys [data-from-ws-pub]} state
           *auto-push-assets-update-ops? (:*auto-push-assets-update-ops? state)
           toggle-auto-push-assets-update-ops-ch (:toggle-auto-push-assets-update-ops-chan state)
           push-data-from-ws-ch (chan (async/sliding-buffer 100) (map rtc-const/data-from-ws-coercer))
           stop-assets-sync-loop-chan (chan)]
+      (reset! (:*stop-asset-sync-loop-chan state) stop-assets-sync-loop-chan)
       (async/sub data-from-ws-pub "push-assets-updates" push-data-from-ws-ch)
       (<! (go-loop [push-assets-update-ops-ch
                     (make-push-assets-update-ops-timeout-ch repo (not @*auto-push-assets-update-ops?))]
@@ -117,8 +146,10 @@
                   (recur (make-push-assets-update-ops-timeout-ch repo (not @*auto-push-assets-update-ops?))))
 
                 client-assets-update
-                (let [maybe-exp (<! (user/<wrap-ensure-id&access-token
-                                     (<! (<client-op-update-handler state))))]
+                ;; TODO: <wrap-ensure-id&access-token, ensure token not expired
+                ;; because this ns is running in db-worker now, need to move(or copy) <wrap-ensure-id&access-token
+                ;; to db-worker again
+                (let [maybe-exp (<! (<client-op-update-handler state))]
                   (if (= :expired-token (:anom (ex-data maybe-exp)))
                     (prn ::<loop-for-assets-sync "quitting loop" maybe-exp)
                     (recur (make-push-assets-update-ops-timeout-ch repo (not @*auto-push-assets-update-ops?)))))
@@ -129,3 +160,28 @@
 
                 :else nil))))
       (async/unsub data-from-ws-pub "push-assets-update" push-data-from-ws-ch))))
+
+(comment
+  (go-loop []
+    (when-let [{min-epoch-asset-ops :ops asset-uuid :asset-uuid} (op-mem-layer/get-min-epoch-asset-ops repo)]
+      (try
+        (doseq [[tp _op] min-epoch-asset-ops]
+          (case tp
+            :update-asset
+            (let [asset-entity (d/pull @conn '[*] [:asset/uuid asset-uuid])
+                  r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
+                                                 :create [{:asset-uuid asset-uuid
+                                                           :asset-name (or (some-> asset-entity :asset/meta :name)
+                                                                           "default-name")}]}))]
+              (when (:ex-data r)
+                (throw (ex-info (:ex-message r) (:ex-data r)))))
+
+            :remove-asset
+            (let [r (<? (ws/<send&receive state {:action "update-assets" :graph-uuid graph-uuid
+                                                 :delete [asset-uuid]}))]
+              (when (:ex-data r)
+                (throw (ex-info (:ex-message r) (:ex-data r)))))))
+        (op-mem-layer/remove-asset-ops! repo asset-uuid)
+        (recur)
+        (catch :default e
+          (prn ::unknown-ex e))))))

+ 27 - 24
src/main/frontend/worker/rtc/core.cljs

@@ -25,10 +25,10 @@
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.op-mem-layer :as op-mem-layer]
             [frontend.worker.rtc.ws :as ws]
+            [frontend.worker.rtc.asset-sync :as asset-sync]
             [promesa.core :as p]
             [cljs-bean.core :as bean]))
 
-
 ;;                     +-------------+
 ;;                     |             |
 ;;                     |   server    |
@@ -99,7 +99,7 @@
    {:persist-op? false
     :transact-opts {:repo (first args)
                     :conn (second args)}}
-    (apply outliner-core/delete-blocks! args)))
+   (apply outliner-core/delete-blocks! args)))
 
 (defmethod transact-db! :move-blocks [_ & args]
   (outliner-tx/transact!
@@ -320,7 +320,6 @@
       (update-block-attrs repo conn date-formatter self op-value)
       (prn :apply-remote-move-ops self r parents left))))
 
-
 (defn apply-remote-update-ops
   [repo conn date-formatter update-ops]
   (prn :update-ops update-ops)
@@ -371,7 +370,6 @@
     (when-let [page-name (:block/name (d/entity @conn [:block/uuid (:block-uuid op)]))]
       (worker-page/delete! repo conn page-name nil {:redirect-to-home? false :persist-op? false}))))
 
-
 (defn filter-remote-data-by-local-unpushed-ops
   "when remote-data request client to move/update/remove/... blocks,
   these updates maybe not needed, because this client just updated some of these blocks,
@@ -465,7 +463,6 @@
       (when (= r ::need-pull-remote-data)
         r))))
 
-
 (defn- remove-non-exist-block-uuids-in-add-retract-map
   [conn add-retract-map]
   (let [{:keys [add retract]} add-retract-map
@@ -610,7 +607,6 @@
     {:remote-ops @*remote-ops
      :depend-on-block-uuids @*depend-on-block-uuid-set}))
 
-
 (defn gen-block-uuid->remote-ops
   [repo conn & {:keys [n] :or {n 50}}]
   (loop [current-handling-block-ops nil
@@ -690,7 +686,6 @@
                          block-uuid->remote-ops)]
     (concat update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
 
-
 (defn- <client-op-update-handler
   [state _token]
   {:pre [(some? @(:*graph-uuid state))
@@ -809,7 +804,6 @@
                 nil))))
       (async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch))))
 
-
 (defn <grant-graph-access-to-others
   [state graph-uuid & {:keys [target-user-uuids target-user-emails]}]
   (go
@@ -856,7 +850,6 @@
 ;;       (<! (ws/<send&receive state {:action "query-blocks" :graph-uuid @(:*graph-uuid state)
 ;;                                    :block-uuids [page-block-uuid]})))))
 
-
 (defn init-state
   [ws data-from-ws-chan repo token]
   ;; {:post [(m/validate state-schema %)]}
@@ -897,24 +890,30 @@
 
 ;; FIXME: token might be expired
 (defn <init-state
-  [repo token]
+  [repo token reset-*state?]
   (go
     (let [data-from-ws-chan (chan (async/sliding-buffer 100))
           ws-opened-ch (chan)
           ws (ws/ws-listen token data-from-ws-chan ws-opened-ch)]
       (<! ws-opened-ch)
       (let [state (init-state ws data-from-ws-chan repo token)]
-        (reset! *state state)
-        (swap! *state update :counter inc)
+        (when reset-*state?
+          (reset! *state state)
+          (swap! *state update :counter inc))
         state))))
 
 (defn <start-rtc
   [repo conn token]
   (go
-    (let [state (<! (<init-state repo token))
+    (let [state (<! (<init-state repo token true))
+          state-for-asset-sync (asset-sync/init-state-from-rtc-state state)
+          _ (reset! asset-sync/*asset-sync-state state-for-asset-sync)
           config (worker-state/get-config repo)]
       (if-let [graph-uuid (op-mem-layer/get-graph-uuid repo)]
-        (<! (<loop-for-rtc state graph-uuid repo conn (common-config/get-date-formatter config)))
+        (let [c1 (<loop-for-rtc state graph-uuid repo conn (common-config/get-date-formatter config))
+              c2 (asset-sync/<loop-for-assets-sync state-for-asset-sync graph-uuid repo conn)]
+          (<! c1)
+          (<! c2))
         (worker-util/post-message :notification (pr-str
                                                  [[:div
                                                    [:p "RTC is not supported for this graph"]]
@@ -922,10 +921,14 @@
 
 (defn <stop-rtc
   []
-  (when-let [chan (some-> @*state
-                           :*stop-rtc-loop-chan
-                           deref)]
-    (async/close! chan)))
+  (when-let [ch (some-> @*state
+                        :*stop-rtc-loop-chan
+                        deref)]
+    (async/close! ch))
+  (when-let [ch (some-> @asset-sync/*asset-sync-state
+                        :*stop-asset-sync-loop-chan
+                        deref)]
+    (async/close! ch)))
 
 (defn <toggle-sync
   []
@@ -936,12 +939,12 @@
   [repo token]
   (let [d (p/deferred)]
     (go
-     (let [state (or @*state (<! (<init-state repo token)))
-           graph-list (with-sub-data-from-ws state
-                        (<! (ws/<send! state {:req-id (get-req-id)
-                                              :action "list-graphs"}))
-                        (:graphs (<! (get-result-ch))))]
-       (p/resolve! d (bean/->js graph-list))))
+      (let [state (or @*state (<! (<init-state repo token true)))
+            graph-list (with-sub-data-from-ws state
+                         (<! (ws/<send! state {:req-id (get-req-id)
+                                               :action "list-graphs"}))
+                         (:graphs (<! (get-result-ch))))]
+        (p/resolve! d (bean/->js graph-list))))
     d))
 
 (add-watch *state :notify-main-thread

+ 8 - 0
src/main/frontend/worker/rtc/op_mem_layer.cljs

@@ -454,6 +454,14 @@
           keys
           count))
 
+(defn get-unpushed-asset-update-count
+  [repo]
+  (some-> (get @*ops-store repo)
+          :current-branch
+          :asset-uuid->ops
+          keys
+          count))
+
 (defn intersection-block-uuids
   [repo block-uuid-coll]
   (some->> (get @*ops-store repo)