|
|
@@ -107,6 +107,20 @@
|
|
|
mws-state (assoc :ws-state mws-state)))
|
|
|
(m/reductions {} nil mws-state-flow)))
|
|
|
|
|
|
+(def ^:private *rtc-lock (atom nil))
|
|
|
+(defn- holding-rtc-lock
|
|
|
+ "Use this fn to prevent multiple rtc-loops at same time.
|
|
|
+ rtc-loop-task is stateless, but conn is not.
|
|
|
+ we need to ensure that no two concurrent rtc-loop-tasks are modifying `conn` at the same time"
|
|
|
+ [task]
|
|
|
+ (m/sp
|
|
|
+ (when-not (compare-and-set! *rtc-lock nil true)
|
|
|
+ (throw (ex-info "Must not run multiple rtc-loops, try later" {:missionary/retry true})))
|
|
|
+ (try
|
|
|
+ (m/? task)
|
|
|
+ (finally
|
|
|
+ (compare-and-set! *rtc-lock true nil)))))
|
|
|
+
|
|
|
(defn create-rtc-loop
|
|
|
"Return a map with [:rtc-log-flow :rtc-state-flow :rtc-loop-task :*rtc-auto-push?]
|
|
|
TODO: auto refresh token if needed"
|
|
|
@@ -123,25 +137,26 @@
|
|
|
:rtc-state-flow (create-rtc-state-flow (create-mws-state-flow *current-mws))
|
|
|
:*rtc-auto-push? *auto-push?
|
|
|
:rtc-loop-task
|
|
|
- (m/sp
|
|
|
- (try
|
|
|
+ (holding-rtc-lock
|
|
|
+ (m/sp
|
|
|
+ (try
|
|
|
;; init run to open a ws
|
|
|
- (m/? get-mws-create-task)
|
|
|
- (->>
|
|
|
- (let [event (m/?> mixed-flow)]
|
|
|
- (case (:type event)
|
|
|
- :remote-update
|
|
|
- (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn)
|
|
|
+ (m/? get-mws-create-task)
|
|
|
+ (->>
|
|
|
+ (let [event (m/?> mixed-flow)]
|
|
|
+ (case (:type event)
|
|
|
+ :remote-update
|
|
|
+ (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn)
|
|
|
|
|
|
- :local-update-check
|
|
|
- (m/? (r.client/create-push-local-ops-task
|
|
|
- repo conn user-uuid graph-uuid date-formatter
|
|
|
- get-mws-create-task add-log-fn))))
|
|
|
- (m/ap)
|
|
|
- (m/reduce {} nil)
|
|
|
- (m/?))
|
|
|
- (catch Cancelled _
|
|
|
- (add-log-fn {:type ::cancelled}))))}))
|
|
|
+ :local-update-check
|
|
|
+ (m/? (r.client/create-push-local-ops-task
|
|
|
+ repo conn user-uuid graph-uuid date-formatter
|
|
|
+ get-mws-create-task add-log-fn))))
|
|
|
+ (m/ap)
|
|
|
+ (m/reduce {} nil)
|
|
|
+ (m/?))
|
|
|
+ (catch Cancelled _
|
|
|
+ (add-log-fn {:type ::cancelled})))))}))
|
|
|
|
|
|
(def send&recv r.client/send&recv)
|
|
|
|
|
|
@@ -161,6 +176,7 @@
|
|
|
(def rtc-state-flow rtc-state-flow)
|
|
|
(def *rtc-auto-push? *rtc-auto-push?)))
|
|
|
(cancel)
|
|
|
+
|
|
|
(def cancel2 ((m/reduce (fn [_ v] (prn :v v) v)
|
|
|
(m/latest vector rtc-state-flow (m/reductions {} nil rtc-log-flow)))
|
|
|
#(js/console.log :succ %) #(js/console.log :fail %)))
|