|
|
@@ -1,7 +1,8 @@
|
|
|
(ns frontend.components.rtc.flows
|
|
|
(:require [frontend.state :as state]
|
|
|
[missionary.core :as m]
|
|
|
- [cljs-time.core :as t]))
|
|
|
+ [cljs-time.core :as t])
|
|
|
+ (:import [missionary Cancelled]))
|
|
|
|
|
|
(def rtc-log-flow
|
|
|
(m/watch (:rtc/log @state/state)))
|
|
|
@@ -30,19 +31,27 @@
|
|
|
[minutes]
|
|
|
(let [*buffer (atom {})]
|
|
|
(m/ap
|
|
|
- (let [latest-updates (m/?> (m/watch (:rtc/recent-updates @state/state)))]
|
|
|
- (when-let [graph-uuid (first (keys latest-updates))]
|
|
|
- (let [mins-ago (t/minus (t/now) (t/minutes minutes))
|
|
|
- latest-keys (map (fn [[user-uuid _]] user-uuid) (get latest-updates graph-uuid))
|
|
|
- new-map
|
|
|
- {graph-uuid
|
|
|
- (into {}
|
|
|
- (map (fn [k]
|
|
|
- [k
|
|
|
- (take-while
|
|
|
- (fn [[inst _]] (> inst mins-ago))
|
|
|
- (concat (get-in latest-updates [graph-uuid k])
|
|
|
- (get-in @*buffer [graph-uuid k])))]))
|
|
|
- latest-keys)}]
|
|
|
- (swap! *buffer merge new-map)
|
|
|
- @*buffer))))))
|
|
|
+ (let [{:keys [graph-uuid online-users]} (m/?< (m/watch (:rtc/state @state/state)))
|
|
|
+ user-uuid->user (into {} (map (juxt :user/uuid identity)) online-users)
|
|
|
+ graph-uuid (uuid graph-uuid)]
|
|
|
+ (try
|
|
|
+ (let [latest-updates (m/?< (m/watch (:rtc/recent-updates @state/state)))]
|
|
|
+ (when-let [graph-uuid* (first (keys latest-updates))]
|
|
|
+ (when (= graph-uuid graph-uuid*)
|
|
|
+ (let [mins-ago (t/minus (t/now) (t/minutes minutes))
|
|
|
+ latest-keys (map (fn [[user-uuid _]] user-uuid) (get latest-updates graph-uuid))
|
|
|
+ new-map
|
|
|
+ {graph-uuid
|
|
|
+ (into {}
|
|
|
+ (map (fn [user-uuid]
|
|
|
+ [user-uuid
|
|
|
+ {:user-info (user-uuid->user user-uuid)
|
|
|
+ :updates
|
|
|
+ (take-while
|
|
|
+ (fn [[inst _]] (> inst mins-ago))
|
|
|
+ (concat (get-in latest-updates [graph-uuid user-uuid])
|
|
|
+ (get-in @*buffer [graph-uuid user-uuid])))}]))
|
|
|
+ latest-keys)}]
|
|
|
+ (swap! *buffer merge new-map)
|
|
|
+ @*buffer))))
|
|
|
+ (catch Cancelled _))))))
|