1
0
Эх сурвалжийг харах

enhance: c.m/backoff add :reset-flow option

We can use worker-flows/online-event-flow
to trigger a retry immediately upon receiving an 'online' event,
instead of waiting for the next retry delay.
rcmerci 6 сар өмнө
parent
commit
fd40bbeec5

+ 1 - 1
deps.edn

@@ -40,7 +40,7 @@
   logseq/shui                           {:local/root "deps/shui"}
   metosin/malli                         {:mvn/version "0.16.1"}
   com.cognitect/transit-cljs            {:mvn/version "0.8.280"}
-  missionary/missionary                 {:mvn/version "b.39"}
+  missionary/missionary                 {:mvn/version "b.44"}
   meander/epsilon                       {:mvn/version "0.0.650"}
 
   io.github.open-spaced-repetition/cljc-fsrs {:git/sha "0e70e96a73cf63c85dcc2df4d022edf12806b239"

+ 44 - 21
src/main/frontend/common/missionary.cljs

@@ -21,32 +21,55 @@
         (m/reductions {} init-value)
         (m/latest identity))))
 
-(def delays (reductions * 1000 (repeat 2)))
-
-(def ^:private retry-sentinel (js-obj))
-(defn backoff
-  "Retry task when it throw exception `(get ex-data :missionary/retry)`"
-  [delays-seq task]
-  (m/sp
-    (loop [[delay & rest-delays] (seq delays-seq)]
-      (let [r (try
-                (m/? task)
-                (catch :default e
-                  (if (and (some-> e ex-data :missionary/retry)
-                           (pos-int? delay))
-                    (do (m/? (m/sleep delay))
-                        (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
-                        retry-sentinel)
-                    (throw e))))]
-        (if (identical? r retry-sentinel)
-          (recur rest-delays)
-          r)))))
-
 (defn mix
   "Return a flow which is mixed by `flows`"
   [& flows]
   (m/ap (m/?> (m/?> (count flows) (m/seed flows)))))
 
+(def never-flow (m/ap (m/? m/never)))
+
+(def delays (reductions * 1000 (repeat 2)))
+
+(def ^:private retry-sentinel (js-obj))
+(defn backoff
+  "Retry task when it throw exception `(get ex-data :missionary/retry)`
+  :delay-seq - retry delay-msecs
+  :reset-flow - retry immediately when getting value from flow and reset delays to init state"
+  [{:keys [delay-seq reset-flow]
+    :or {delay-seq (take 4 delays)
+         reset-flow never-flow}}
+   task]
+  (let [reset-flow* (mix reset-flow never-flow)]
+    (m/sp
+      (loop [[delay & rest-delays] (seq delay-seq)]
+        (let [r (try
+                  (m/? task)
+                  (catch :default e
+                    (if (and (some-> e ex-data :missionary/retry)
+                             (pos-int? delay))
+                      (let [delay-or-reset
+                            (m/? (m/race (m/sleep delay :delay)
+                                         (m/reduce (fn [_ r] (when r (reduced :reset))) nil
+                                                   (->> (continue-flow reset-flow*)
+                                                        (m/eduction (drop 1) (take 1))))))
+                            rest-delays*
+                            (case delay-or-reset
+                              :delay
+                              (do (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
+                                  rest-delays)
+                              :reset
+                              (do (println :missionary/retry  "retry now (" (ex-message e) ")")
+                                  delay-seq))]
+                        [retry-sentinel rest-delays*])
+                      (throw e))))]
+          (if (and (vector? r)
+                   (first r) ;; if delete this `(first r)`,
+                       ;; the code continues to the next line even if r=0...
+                       ;; I suspect it's a bug in missionary.
+                   (identical? retry-sentinel (first r)))
+            (recur (second r))
+            r))))))
+
 (defn clock
   "Return a flow that emits `value` every `interval-ms`."
   ([interval-ms]

+ 4 - 2
src/main/frontend/handler.cljs

@@ -104,7 +104,9 @@
 (defn- handle-connection-change
   [e]
   (let [online? (= (gobj/get e "type") "online")]
-    (state/set-online! online?)))
+    (state/set-online! online?)
+    (state/<invoke-db-worker :thread-api/update-thread-atom
+                             :thread-atom/online-event online?)))
 
 (defn set-network-watcher!
   []
@@ -155,7 +157,6 @@
   (i18n/start)
   (instrument/init)
   (state/set-online! js/navigator.onLine)
-  (set-network-watcher!)
 
   (-> (util/indexeddb-check?)
       (p/catch (fn [_e]
@@ -177,6 +178,7 @@
                _ (if (empty? repos)
                    (repo-handler/new-db! config/demo-repo)
                    (restore-and-setup! repo))]
+         (set-network-watcher!)
          (when (util/electron?)
            (persist-db/run-export-periodically!))
          (when (mobile-util/native-platform?)

+ 1 - 0
src/main/frontend/worker/db_worker.cljs

@@ -25,6 +25,7 @@
             [frontend.worker.rtc.db-listener]
             [frontend.worker.search :as search]
             [frontend.worker.state :as worker-state]
+            [frontend.worker.thread-atom]
             [frontend.worker.undo-redo :as undo-redo]
             [frontend.worker.util :as worker-util]
             [goog.object :as gobj]

+ 13 - 0
src/main/frontend/worker/flows.cljs

@@ -0,0 +1,13 @@
+(ns frontend.worker.flows
+  "common flows in worker thread"
+  (:require [frontend.worker.state :as worker-state]
+            [missionary.core :as m]))
+
+(def online-event-flow
+  (->> (m/watch (get @worker-state/*state :thread-atom/online-event))
+       (m/eduction
+        (drop-while nil?)
+        (filter true?))))
+
+(comment
+  ((m/reduce (fn [_ x] (prn :xxx x)) online-event-flow) prn prn))

+ 5 - 1
src/main/frontend/worker/rtc/client.cljs

@@ -3,6 +3,7 @@
   (:require [clojure.string :as string]
             [datascript.core :as d]
             [frontend.common.missionary :as c.m]
+            [frontend.worker.flows :as worker-flows]
             [frontend.worker.rtc.branch-graph :as r.branch-graph]
             [frontend.worker.rtc.client-op :as client-op]
             [frontend.worker.rtc.exception :as r.ex]
@@ -64,7 +65,10 @@
           (let [{:keys [max-remote-schema-version]}
                 (m/?
                  (c.m/backoff
-                  (take 5 (drop 2 c.m/delays)) ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
+                  {:delay-seq
+                   ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
+                   (take 5 (drop 2 c.m/delays))
+                   :reset-flow worker-flows/online-event-flow}
                   (new-task--register-graph-updates get-ws-create-task graph-uuid major-schema-version repo)))]
             (when max-remote-schema-version
               (add-log-fn :rtc.log/higher-remote-schema-version-exists

+ 3 - 1
src/main/frontend/worker/rtc/ws.cljs

@@ -4,6 +4,7 @@
   https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs"
   (:require [cljs-http-missionary.client :as http]
             [frontend.common.missionary :as c.m]
+            [frontend.worker.flows :as worker-flows]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.malli-schema :as rtc-schema]
             [missionary.core :as m]))
@@ -88,7 +89,8 @@
                (pos-int? open-ws-timeout))
           [retry-count open-ws-timeout])
   (c.m/backoff
-   (take retry-count c.m/delays)
+   {:delay-seq (take retry-count c.m/delays)
+    :reset-flow worker-flows/online-event-flow}
    (m/sp
      (try
        (if-let [ws (m/? (m/timeout (create-mws* url) open-ws-timeout))]

+ 6 - 1
src/main/frontend/worker/state.cljs

@@ -48,7 +48,12 @@
 
                        ;; new implementation
                        :undo/repo->ops (atom {})
-                       :redo/repo->ops (atom {})}))
+                       :redo/repo->ops (atom {})
+
+
+                       ;; thread atoms, these atoms' value are syncing from ui-thread
+                       :thread-atom/online-event (atom nil)
+                       }))
 
 (defonce *rtc-ws-url (atom nil))
 

+ 12 - 0
src/main/frontend/worker/thread_atom.cljs

@@ -0,0 +1,12 @@
+(ns frontend.worker.thread-atom
+  "atoms from ui-thread"
+  (:require [frontend.common.thread-api :as thread-api :refer [def-thread-api]]
+            [frontend.worker.state :as worker-state]))
+
+(def-thread-api :thread-api/update-thread-atom
+  [atom-key new-value]
+  (assert (and (keyword? atom-key)
+               (identical? "thread-atom" (namespace atom-key))))
+  (when-let [a (get @worker-state/*state atom-key)]
+    (reset! a new-value)
+    nil))

+ 4 - 4
src/rtc_e2e_test/client_steps.cljs

@@ -44,7 +44,7 @@
        (is (nil? r)))
      (m/?
       (c.m/backoff
-       (take 4 c.m/delays)
+       {}
        (m/sp
          (let [conn (helper/get-downloaded-test-conn)
                page1 (d/pull @conn '[*] [:block/uuid const/page1-uuid])
@@ -72,7 +72,7 @@
        (m/? (helper/new-task--wait-all-client-ops-sent))))
    :client2
    (c.m/backoff
-    (take 4 c.m/delays)
+    {}
     (m/sp
       (let [conn (helper/get-downloaded-test-conn)
             page (d/pull @conn '[*] [:block/uuid const/page2-uuid])]
@@ -107,7 +107,7 @@
        (m/? (helper/new-task--wait-all-client-ops-sent))))
    :client2
    (c.m/backoff
-    (take 4 c.m/delays)
+    {}
     (m/sp
       (let [conn (helper/get-downloaded-test-conn)
             block1 (d/pull @conn
@@ -231,7 +231,7 @@ client2:
        (m/? (helper/new-task--client2-sync-barrier-1->2 "step6"))
        (m/?
         (c.m/backoff
-         (take 4 c.m/delays)
+         {}
          (m/sp
            (let [page (d/pull @conn '[*] [:block/uuid const/step6-page-uuid])
                  page-blocks (when-let [page-id (:db/id page)]

+ 7 - 7
src/rtc_e2e_test/helper.cljs

@@ -25,7 +25,7 @@
 (defn new-task--wait-creating-graph
   [graph-uuid]
   (c.m/backoff
-   (take 4 c.m/delays)
+   {}
    (m/sp
      (let [graphs (m/? (rtc.core/new-task--get-graphs const/test-token))
            graph (some (fn [graph] (when (= graph-uuid (:graph-uuid graph)) graph)) graphs)]
@@ -47,7 +47,7 @@
 
 (def new-task--get-remote-example-graph-uuid
   (c.m/backoff
-   (take 5 c.m/delays)
+   {}
    (m/sp
      (let [graphs (m/? (rtc.core/new-task--get-graphs const/test-token))
            graph
@@ -94,11 +94,11 @@
   #_:clj-kondo/ignore
   (me/find
    client-op
-   [?op-type _ {:block-uuid ?block-uuid :av-coll [[!a !v _ !add] ...]}]
-   [?op-type ?block-uuid (map vector !a !v !add)]
+    [?op-type _ {:block-uuid ?block-uuid :av-coll [[!a !v _ !add] ...]}]
+    [?op-type ?block-uuid (map vector !a !v !add)]
 
-   [?op-type _ {:block-uuid ?block-uuid}]
-   [?op-type ?block-uuid]))
+    [?op-type _ {:block-uuid ?block-uuid}]
+    [?op-type ?block-uuid]))
 
 (defn new-task--wait-all-client-ops-sent
   [& {:keys [timeout] :or {timeout 10000}}]
@@ -144,7 +144,7 @@
   "Return a task that return message from other client"
   [block-title-pred-fn & {:keys [retry-message retry-count] :or {retry-count 4}}]
   (c.m/backoff
-   (take retry-count c.m/delays)
+   {:delay-seq (take retry-count c.m/delays)}
    (m/sp
      (let [conn (get-downloaded-test-conn)
            message-page-id (:db/id (ldb/get-page @conn const/message-page-uuid))