瀏覽代碼

feat(rtc): add pull-remote-updates-flow

rcmerci 1 年之前
父節點
當前提交
2ec3ef36dd
共有 1 個文件被更改,包括 37 次插入5 次删除
  1. 37 5
      src/main/frontend/worker/rtc/core.cljs

+ 37 - 5
src/main/frontend/worker/rtc/core.cljs

@@ -52,11 +52,32 @@
                 (filter (fn [v] (when (pos? (client-op/get-unpushed-ops-count repo)) v)))
                 merge-flow)))
 
+(defn- create-pull-remote-updates-flow
+  "Return a flow: emit to pull remote-updates.
+  reschedule next emit(INTERVAL-MS later) every time FLOW emit a value."
+  [interval-ms flow]
+  (let [v {:type :pull-remote-updates}
+        clock-flow (m/ap
+                     (loop []
+                       (m/amb
+                        (m/? (m/sleep interval-ms v))
+                        (recur))))]
+    (m/ap
+      (m/amb
+       v
+       (let [_ (m/?< (->> flow
+                          (m/reductions {} nil)
+                          (m/latest identity)))]
+         (try
+           (m/?< clock-flow)
+           (catch Cancelled _ (m/amb))))))))
+
 (defn- create-mixed-flow
   "Return a flow that emits all kinds of events:
   `:remote-update`: remote-updates data from server
   `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
-  `:online-users-updated`: online users info updated"
+  `:online-users-updated`: online users info updated
+  `:pull-remote-updates`: pull remote updates"
   [repo get-ws-create-task *auto-push?]
   (let [remote-updates-flow (m/eduction
                              (map (fn [data]
@@ -66,8 +87,9 @@
                              (get-remote-updates get-ws-create-task))
         local-updates-check-flow (m/eduction
                                   (map (fn [data] {:type :local-update-check :value data}))
-                                  (create-local-updates-check-flow repo *auto-push? 2000))]
-    (c.m/mix remote-updates-flow local-updates-check-flow)))
+                                  (create-local-updates-check-flow repo *auto-push? 2000))
+        mix-flow (m/stream (c.m/mix remote-updates-flow local-updates-check-flow))]
+    (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
 
 (defn- new-task--get-ws-create
   "Return a map with atom *current-ws and a task
@@ -182,7 +204,11 @@
                      get-ws-create-task add-log-fn))
 
                :online-users-updated
-               (reset! *online-users (:online-users (:value event)))))
+               (reset! *online-users (:online-users (:value event)))
+
+               :pull-remote-updates
+               (m/? (r.client/new-task--pull-remote-data
+                     repo conn graph-uuid date-formatter get-ws-create-task add-log-fn))))
            (m/ap)
            (m/reduce {} nil)
            (m/?))
@@ -387,4 +413,10 @@
       (def cancel c)
       (def rtc-state-flow rtc-state-flow)
       (def *rtc-auto-push? *rtc-auto-push?)))
-  (cancel))
+  (cancel)
+
+  (do
+    (def a (atom 1))
+    (def f1 (m/watch a))
+    (def f2 (create-pull-remote-updates-flow 5000 f1))
+    (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx))))