| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- (ns frontend.handler.db-based.rtc-flows
- "Flows related to RTC"
- (:require [frontend.common.missionary :as c.m]
- [frontend.flows :as flows]
- [frontend.state :as state]
- [logseq.common.util :as common-util]
- [missionary.core :as m])
- (:import [missionary Cancelled]))
- (def rtc-log-flow
- (m/watch (:rtc/log @state/state)))
- (def rtc-download-log-flow
- (m/eduction
- (filter #(keyword-identical? :rtc.log/download (:type %)))
- rtc-log-flow))
- (def rtc-upload-log-flow
- (m/eduction
- (filter #(keyword-identical? :rtc.log/upload (:type %)))
- rtc-log-flow))
- (def rtc-misc-log-flow
- (m/eduction
- (remove #(contains? #{:rtc.log/download :rtc.log/upload} (:type %)))
- rtc-log-flow))
- (def rtc-state-flow
- (m/watch (:rtc/state @state/state)))
- (def rtc-running-flow
- (m/eduction (map :rtc-lock) rtc-state-flow))
- (def rtc-online-users-flow
- (c.m/throttle
- 500
- (m/eduction
- (map (fn [m]
- (when (and (= :open (:ws-state (:rtc-state m)))
- (:rtc-lock m))
- (:online-users m))))
- (dedupe)
- rtc-state-flow)))
- (def ^:private network-online-change-flow
- (m/stream
- (m/relieve
- (m/observe
- (fn ctor [emit!]
- (let [origin-callback js/window.ononline]
- (set! js/window.ononline emit!)
- (emit! nil)
- (fn dtor []
- (set! js/window.ononline origin-callback))))))))
- (def rtc-try-restart-flow
- "emit an event when it's time to restart rtc loop.
- conditions:
- - user logged in
- - no rtc loop running now
- - last rtc stop-reason is websocket message timeout
- - current js/navigator.onLine=true
- - throttle 5000ms"
- (->> (m/latest
- (fn [rtc-state _ login-user]
- (assoc rtc-state :login-user login-user))
- rtc-state-flow
- (c.m/continue-flow network-online-change-flow)
- flows/current-login-user-flow)
- (m/eduction
- (keep (fn [m]
- (let [{:keys [rtc-lock last-stop-exception-ex-data graph-uuid login-user]} m]
- (when (and (some? (:email login-user))
- (some? graph-uuid)
- (not rtc-lock) ; no rtc loop now
- (= :rtc.exception/ws-timeout (:type last-stop-exception-ex-data))
- (true? js/navigator.onLine))
- {:graph-uuid graph-uuid :t (common-util/time-ms)})))))
- (c.m/throttle 5000)))
- (def logout-or-graph-switch-flow
- (c.m/mix
- (m/eduction
- (filter #(= :logout %))
- flows/current-login-user-flow)
- (m/eduction
- (keep (fn [repo] (when repo :graph-switch)))
- flows/current-repo-flow)))
- (def ^:private *rtc-start-trigger (atom nil))
- (defn trigger-rtc-start
- [repo]
- (assert (some? repo))
- (reset! *rtc-start-trigger repo))
- (def ^:private document-visible&rtc-not-running-flow
- (m/ap
- (let [visibility (m/?< flows/document-visibility-state-flow)]
- (try
- (if (= "visible" visibility)
- (let [rtc-lock (:rtc-lock (m/? (c.m/snapshot-of-flow rtc-state-flow)))]
- (if (not rtc-lock)
- :document-visible&rtc-not-running
- (m/amb)))
- (m/amb))
- (catch Cancelled _
- (m/amb))))))
- (def ^:private network-online&rtc-not-running-flow
- (m/ap
- (let [online? (m/?< flows/network-online-event-flow)]
- (try
- (if online?
- (let [rtc-lock (:rtc-lock (m/? (c.m/snapshot-of-flow rtc-state-flow)))]
- (if (not rtc-lock)
- :network-online&rtc-not-running
- (m/amb)))
- (m/amb))
- (catch Cancelled _
- (m/amb))))))
- (def trigger-start-rtc-flow
- (->>
- [;; login-user changed
- (m/eduction
- (keep (fn [user] (when (:email user) [:login])))
- flows/current-login-user-flow)
- ;; repo changed
- (m/eduction
- (keep (fn [repo] (when repo [:graph-switch repo])))
- flows/current-repo-flow)
- ;; trigger-rtc by somewhere else
- (m/eduction
- (keep (fn [repo] (when repo [:trigger-rtc repo])))
- (m/watch *rtc-start-trigger))
- ;; document visibilitychange->true
- (m/eduction
- (map vector)
- document-visible&rtc-not-running-flow)
- ;; network online->true
- (m/eduction
- (map vector)
- network-online&rtc-not-running-flow)]
- (apply c.m/mix)
- (m/eduction (filter (fn [_] (some? (state/get-auth-id-token)))))
- (c.m/debounce 200)))
|