فهرست منبع

fix(rtc): handle remote :graph-not-exist exception

rcmerci 1 سال پیش
والد
کامیت
1b9fd83f57

+ 3 - 1
src/main/frontend/db/rtc/debug_ui.cljs

@@ -54,7 +54,9 @@
                                  (swap! debug-state assoc
                                         :remote-graphs
                                         (map
-                                         #(select-keys % [:graph-uuid :graph-status])
+                                         #(into {}
+                                                (filter second
+                                                 (select-keys % [:graph-uuid :graph-status :group])))
                                          graph-list))))))]
 
      [:pre.select-text

+ 2 - 1
src/main/frontend/db_worker.cljs

@@ -569,7 +569,8 @@
 
   (rtc-stop
    [this]
-   (async-util/c->p (rtc-core/<stop-rtc)))
+   (when-let [state @rtc-core/*state]
+     (rtc-core/stop-rtc state)))
 
   (rtc-toggle-sync
    [this repo]

+ 1 - 1
src/main/frontend/worker/rtc/const.cljs

@@ -126,7 +126,7 @@
         [:op :keyword]
         [:block-uuid :uuid]]]]]]
    [:ex-data {:optional true} [:map [:type :keyword]]]
-   [:ex-message {:optional true} :any]])
+   [:ex-message {:optional true} :string]])
 
 (def data-from-ws-coercer (m/coercer data-from-ws-schema mt/string-transformer))
 (def data-from-ws-validator (m/validator data-from-ws-schema))

+ 112 - 62
src/main/frontend/worker/rtc/core.cljs

@@ -10,7 +10,7 @@
             [clojure.string :as string]
             [cognitect.transit :as transit]
             [datascript.core :as d]
-            [frontend.worker.async-util :include-macros true :refer [<?]]
+            [frontend.worker.async-util :include-macros true :refer [<? go-try]]
             [frontend.worker.handler.page :as worker-page]
             [frontend.worker.handler.page.rename :as worker-page-rename]
             [frontend.worker.rtc.asset-sync :as asset-sync]
@@ -51,6 +51,12 @@
 ;;                     |             |<--------------------+            |
 ;;                     +-------------+                     +------------+
 ;;                                frontend.worker.rtc.op/op-schema
+;;; exceptions ================================================================
+
+(def ex-break-rtc-loop (ex-info "break rtc loop" {:type ::break-rtc-loop}))
+
+;;; exceptions (ends)
+
 
 (def state-schema
   "
@@ -90,12 +96,15 @@
     (fn [data]
       (if (validator data)
         true
-        (prn (mu/explain-data state-schema data))))))
+        (do (prn (mu/explain-data state-schema data))
+            false)))))
 
 (def rtc-state-schema
   [:enum :open :closed])
 (def rtc-state-validator (m/validator rtc-state-schema))
 
+(defonce *state (atom nil :validator state-validator))
+
 (defn- update-log
   [state {:keys [local-ops remote-update-map]}]
   (when (:dev-mode? state)
@@ -838,11 +847,28 @@
                          block-uuid->remote-ops)]
     (concat update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
 
+(defmulti handle-remote-genernal-exception
+  "throw `ex-break-rtc-loop` when need to quit current rtc-loop"
+  (fn [resp & _] (comp :type :ex-data resp)))
+
+(declare remove-remote-graph-info stop-rtc stop-rtc-helper)
+(defmethod handle-remote-genernal-exception :graph-not-exist [_ state]
+  (when-let [repo (some-> state :*repo deref)]
+    (remove-remote-graph-info repo)
+    (stop-rtc-helper state)
+    (stop-rtc state)
+    (throw ex-break-rtc-loop)))
+
+
+(defmethod handle-remote-genernal-exception nil [resp & _]
+  (throw (ex-info "unknown exception from remote" {:resp resp})))
+
+
 (defn- <client-op-update-handler
   [state _token]
   {:pre [(some? @(:*graph-uuid state))
          (some? @(:*repo state))]}
-  (go
+  (go-try
     (let [repo @(:*repo state)
           conn @(:*db-conn state)
           date-formatter @(:*date-formatter state)]
@@ -872,6 +898,10 @@
               (do (prn ::get-s3-object-failed r)
                   (op-mem-layer/rollback! repo)
                   nil)
+
+              :graph-not-exist
+              (handle-remote-genernal-exception r state)
+
               ;; else
               (do (op-mem-layer/rollback! repo)
                   (throw (ex-info "Unavailable" {:remote-ex remote-ex}))))
@@ -881,9 +911,12 @@
                 (<! (<apply-remote-data state repo conn date-formatter r))
                 (prn :<client-op-update-handler :t (:t r)))))
         (catch :default e
-          (prn ::unknown-ex e)
-          (op-mem-layer/rollback! repo)
-          nil)))))
+          (case (:type (ex-data e))
+            ::break-rtc-loop (throw e)
+            ;; else
+            (do (prn ::unknown-ex e)
+                (op-mem-layer/rollback! repo)
+                nil)))))))
 
 (defn- make-push-client-ops-timeout-ch
   [repo never-timeout?]
@@ -893,7 +926,18 @@
       (<! (async/timeout 2000))
       (pos? (op-mem-layer/get-unpushed-block-update-count repo)))))
 
-(defonce *state (atom nil))
+(defn- remove-remote-graph-info
+  "when remote-graph is deleted or not-found,
+  remove remote-graph-info in client-side"
+  [repo]
+  (op-mem-layer/remove-ops-store! repo))
+
+(defn- stop-rtc-helper
+  [state]
+  (when-let [ws (some-> state :*ws deref)]
+    (ws/stop ws))
+  (when-let [*rtc-state (:*rtc-state state)]
+    (reset! *rtc-state :closed)))
 
 (defn <loop-for-rtc
   ":loop-started-ch used to notify that rtc-loop started"
@@ -916,58 +960,61 @@
       (reset! (:*stop-rtc-loop-chan state) stop-rtc-loop-chan)
       (<! (ws/<ensure-ws-open! state))
       (reset! (:*graph-uuid state) graph-uuid)
-      (with-sub-data-from-ws state
-        (<? (ws/<send! state {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
-        (<! (get-result-ch)))
-
-      (async/sub data-from-ws-pub "push-updates" push-data-from-ws-ch)
-      (when loop-started-ch (async/close! loop-started-ch))
-      (<! (go-loop [push-client-ops-ch
-                    (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))]
-            (let [{:keys [push-data-from-ws client-op-update stop continue]}
-                  (async/alt!
-                    toggle-auto-push-client-ops-ch {:continue true}
-                    force-push-client-ops-ch {:client-op-update true}
-                    push-client-ops-ch ([v] (if (and @*auto-push-client-ops? (true? v))
-                                              {:client-op-update true}
-                                              {:continue true}))
-                    push-data-from-ws-ch ([v] {:push-data-from-ws v})
-                    stop-rtc-loop-chan {:stop true}
-                    :priority true)]
-              (cond
-                continue
-                (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
-
-                push-data-from-ws
-                (let [r (<! (<push-data-from-ws-handler state repo conn date-formatter push-data-from-ws))]
-                  (when (= r ::need-pull-remote-data)
-                    ;; trigger a force push, which can pull remote-diff-data from local-t to remote-t
-                    (async/put! force-push-client-ops-ch true))
-                  (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
-
-                client-op-update
-                ;; FIXME: access token expired
-                (let [_ (<! (<client-op-update-handler state token))]
-                  (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
-
-                stop
-                (do (ws/stop @(:*ws state))
-                    (reset! (:*rtc-state state) :closed))
-
-                :else
-                nil))))
-      (async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch))))
+      (let [resp (<? (ws/<send&receive state {:action "register-graph-updates"
+                                              :graph-uuid graph-uuid}))]
+        (try
+          (when (:ex-data resp) (handle-remote-genernal-exception resp state))
+          (async/sub data-from-ws-pub "push-updates" push-data-from-ws-ch)
+          (when loop-started-ch (async/close! loop-started-ch))
+          (<! (go-loop [push-client-ops-ch
+                        (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))]
+                (let [{:keys [push-data-from-ws client-op-update stop continue]}
+                      (async/alt!
+                        toggle-auto-push-client-ops-ch {:continue true}
+                        force-push-client-ops-ch {:client-op-update true}
+                        push-client-ops-ch ([v] (if (and @*auto-push-client-ops? (true? v))
+                                                  {:client-op-update true}
+                                                  {:continue true}))
+                        push-data-from-ws-ch ([v] {:push-data-from-ws v})
+                        stop-rtc-loop-chan {:stop true}
+                        :priority true)]
+                  (cond
+                    continue
+                    (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
+
+                    push-data-from-ws
+                    (let [r (<! (<push-data-from-ws-handler state repo conn date-formatter push-data-from-ws))]
+                      (when (= r ::need-pull-remote-data)
+                        ;; trigger a force push, which can pull remote-diff-data from local-t to remote-t
+                        (async/put! force-push-client-ops-ch true))
+                      (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
+
+                    client-op-update
+                    ;; FIXME: access token expired
+                    (let [_ (<! (<client-op-update-handler state token))]
+                      (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
+
+                    stop
+                    (stop-rtc-helper state)
+
+                    :else
+                    nil))))
+          (async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch)
+          (catch :default e
+            (case (:type (ex-data e))
+              ::break-rtc-loop (prn :break-rtc-loop))))))))
+
+
+;;;  APIs ================================================================
 
 (defn <grant-graph-access-to-others
   [state graph-uuid & {:keys [target-user-uuids target-user-emails]}]
   (go
-    (let [r (with-sub-data-from-ws state
-              (<? (ws/<send! state (cond-> {:req-id (get-req-id)
-                                            :action "grant-access"
-                                            :graph-uuid graph-uuid}
-                                     target-user-uuids (assoc :target-user-uuids target-user-uuids)
-                                     target-user-emails (assoc :target-user-emails target-user-emails))))
-              (<! (get-result-ch)))]
+    (let [r (<? (ws/<send&receive state
+                                  (cond-> {:action "grant-access"
+                                           :graph-uuid graph-uuid}
+                                    target-user-uuids (assoc :target-user-uuids target-user-uuids)
+                                    target-user-emails (assoc :target-user-emails target-user-emails))))]
       (if-let [ex-message (:ex-message r)]
         (prn ::<grant-graph-access-to-others ex-message (:ex-data r))
         (prn ::<grant-graph-access-to-others :succ)))))
@@ -1082,16 +1129,17 @@
                                                   [:p "RTC is not supported for this graph"]]
                                                  :error])))))
 
-(defn <stop-rtc
-  []
-  (when-let [ch (some-> @*state
+(defn stop-rtc
+  [state]
+  (when-let [ch (some-> state
                         :*stop-rtc-loop-chan
                         deref)]
     (async/close! ch))
-  (when-let [ch (some-> @asset-sync/*asset-sync-state
-                        :*stop-asset-sync-loop-chan
-                        deref)]
-    (async/close! ch)))
+  ;; (when-let [ch (some-> @asset-sync/*asset-sync-state
+  ;;                       :*stop-asset-sync-loop-chan
+  ;;                       deref)]
+  ;;   (async/close! ch))
+  )
 
 (defn <toggle-sync
   []
@@ -1128,6 +1176,8 @@
           (p/resolve! d true))))
     d))
 
+;;; APIs (ends)
+
 (add-watch *state :notify-main-thread
            (fn [_ _ old new]
              (when-let [repo @(:*repo new)]