Browse Source

fix: sync start now waits for DB worker initialization

before invoking worker APIs
Tienson Qin 5 hours ago
parent
commit
5cb50683a8

+ 1 - 1
src/main/frontend/handler/db_based/rtc_background_tasks.cljs

@@ -32,7 +32,7 @@
         (c.m/<? (rtc-handler/<rtc-start! (state/get-current-repo) :stop-before-start? false)))))))
         (c.m/<? (rtc-handler/<rtc-start! (state/get-current-repo) :stop-before-start? false)))))))
 
 
 (run-background-task-when-not-publishing
 (run-background-task-when-not-publishing
- ;; stop rtc when [graph-switch user-logout]
+ ;; stop rtc when [user-logout]
  ::stop-rtc-if-needed
  ::stop-rtc-if-needed
  (m/reduce
  (m/reduce
   (constantly nil)
   (constantly nil)

+ 28 - 9
src/main/frontend/handler/db_based/sync.cljs

@@ -208,6 +208,24 @@
     true
     true
     (true? graph-e2ee?)))
     (true? graph-e2ee?)))
 
 
+(defn- <wait-for-db-worker-ready!
+  []
+  (if @state/*db-worker
+    (p/resolved true)
+    (let [ready (p/deferred)
+          watch-key (keyword "frontend.handler.db-based.sync"
+                             (str "wait-db-worker-ready-" (random-uuid)))]
+      (add-watch state/*db-worker watch-key
+                 (fn [_ _ _ worker]
+                   (when worker
+                     (remove-watch state/*db-worker watch-key)
+                     (p/resolve! ready true))))
+      ;; If worker becomes ready between the initial check and add-watch.
+      (when @state/*db-worker
+        (remove-watch state/*db-worker watch-key)
+        (p/resolve! ready true))
+      ready)))
+
 (defn <rtc-stop!
 (defn <rtc-stop!
   []
   []
   (log/info :db-sync/stop true)
   (log/info :db-sync/stop true)
@@ -215,15 +233,16 @@
 
 
 (defn <rtc-start!
 (defn <rtc-start!
   [repo & {:keys [_stop-before-start?] :as _opts}]
   [repo & {:keys [_stop-before-start?] :as _opts}]
-  (if (should-start-rtc? repo)
-    (do
-      (log/info :db-sync/start {:repo repo})
-      (state/<invoke-db-worker :thread-api/db-sync-start repo))
-    (do
-      (log/info :db-sync/skip-start {:repo repo :reason :graph-not-in-remote-list
-                                     :remote-graphs-loading? (:rtc/loading-graphs? @state/state)
-                                     :has-local-rtc-id? (graph-has-local-rtc-id? repo)})
-      (<rtc-stop!))))
+  (p/let [_ (<wait-for-db-worker-ready!)]
+    (if (should-start-rtc? repo)
+      (do
+        (log/info :db-sync/start {:repo repo})
+        (state/<invoke-db-worker :thread-api/db-sync-start repo))
+      (do
+        (log/info :db-sync/skip-start {:repo repo :reason :graph-not-in-remote-list
+                                       :remote-graphs-loading? (:rtc/loading-graphs? @state/state)
+                                       :has-local-rtc-id? (graph-has-local-rtc-id? repo)})
+        (<rtc-stop!)))))
 
 
 (defonce ^:private debounced-update-presence
 (defonce ^:private debounced-update-presence
   (util/debounce
   (util/debounce

+ 42 - 0
src/test/frontend/handler/db_based/sync_test.cljs

@@ -113,6 +113,48 @@
                           (is false (str e))
                           (is false (str e))
                           (done)))))))
                           (done)))))))
 
 
+(deftest rtc-start-waits-for-db-worker-before-start-test
+  (async done
+         (let [worker (atom nil)
+               called (atom [])]
+           (-> (p/with-redefs [state/get-rtc-graphs (fn [] [{:url "repo-current"}])
+                               state/*db-worker worker]
+                 (p/let [start-p (db-sync/<rtc-start! "repo-current")
+                         _ (p/delay 30)
+                         _ (is (empty? @called))
+                         _ (reset! worker
+                                   (fn [qkw direct-pass? & args]
+                                     (swap! called conj [qkw direct-pass? args])
+                                     (p/resolved :ok)))
+                         _ start-p]
+                   (is (= [[:thread-api/db-sync-start false ["repo-current"]]]
+                          @called))
+                   (done)))
+               (p/catch (fn [e]
+                          (is false (str e))
+                          (done)))))))
+
+(deftest rtc-start-waits-for-db-worker-before-stop-test
+  (async done
+         (let [worker (atom nil)
+               called (atom [])]
+           (-> (p/with-redefs [state/get-rtc-graphs (fn [] [{:url "repo-other"}])
+                               state/*db-worker worker]
+                 (p/let [start-p (db-sync/<rtc-start! "repo-current")
+                         _ (p/delay 30)
+                         _ (is (empty? @called))
+                         _ (reset! worker
+                                   (fn [qkw direct-pass? & args]
+                                     (swap! called conj [qkw direct-pass? args])
+                                     (p/resolved :ok)))
+                         _ start-p]
+                   (is (= [[:thread-api/db-sync-stop false []]]
+                          @called))
+                   (done)))
+               (p/catch (fn [e]
+                          (is false (str e))
+                          (done)))))))
+
 (deftest rtc-create-graph-persists-disabled-e2ee-flag-test
 (deftest rtc-create-graph-persists-disabled-e2ee-flag-test
   (async done
   (async done
          (let [fetch-called (atom nil)
          (let [fetch-called (atom nil)